/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.resources.impl;

import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.AtomixClient;
import io.atomix.resource.ResourceType;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.resources.impl.AtomixTestBase;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;

public class AtomixWorkQueueTest
extends AtomixTestBase {
    private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100L);
    private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();

    @BeforeClass
    public static void preTestSetup() throws Throwable {
        AtomixWorkQueueTest.createCopycatServers(1);
    }

    @AfterClass
    public static void postTestCleanup() throws Exception {
        AtomixWorkQueueTest.clearTests();
    }

    @Override
    protected ResourceType resourceType() {
        return new ResourceType(AtomixWorkQueue.class);
    }

    @Test
    public void testAdd() throws Throwable {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item = DEFAULT_PAYLOAD;
        queue1.addOne((Object)item).join();
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] task2 = DEFAULT_PAYLOAD;
        queue2.addOne((Object)task2).join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)2L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)0L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
    }

    @Test
    public void testAddMultiple() throws Throwable {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item1 = DEFAULT_PAYLOAD;
        byte[] item2 = DEFAULT_PAYLOAD;
        queue1.addMultiple(Arrays.asList(item1, item2)).join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)2L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)0L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
    }

    @Test
    public void testTakeAndComplete() throws Throwable {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item1 = DEFAULT_PAYLOAD;
        queue1.addOne((Object)item1).join();
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        Task removedTask = (Task)queue2.take().join();
        WorkQueueStats stats = (WorkQueueStats)queue2.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)0L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)1L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
        Assert.assertTrue((boolean)Arrays.equals((byte[])removedTask.payload(), item1));
        queue2.complete(Arrays.asList(removedTask.taskId())).join();
        stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)0L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)0L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)1L);
        Assert.assertNull(queue2.take().join());
    }

    @Test
    public void testUnexpectedClientClose() throws Throwable {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item1 = DEFAULT_PAYLOAD;
        queue1.addOne((Object)item1).join();
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        queue2.take().join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)0L, (long)stats.totalPending());
        Assert.assertEquals((long)1L, (long)stats.totalInProgress());
        Assert.assertEquals((long)0L, (long)stats.totalCompleted());
        atomix2.close().join();
        stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)1L, (long)stats.totalPending());
        Assert.assertEquals((long)0L, (long)stats.totalInProgress());
        Assert.assertEquals((long)0L, (long)stats.totalCompleted());
    }

    @Test
    public void testAutomaticTaskProcessing() throws Throwable {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CountDownLatch latch1 = new CountDownLatch(1);
        queue1.registerTaskProcessor(s -> latch1.countDown(), 2, (Executor)executor);
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item1 = DEFAULT_PAYLOAD;
        queue2.addOne((Object)item1).join();
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch1, (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        queue1.stopProcessing();
        byte[] item2 = DEFAULT_PAYLOAD;
        byte[] item3 = DEFAULT_PAYLOAD;
        Tools.delay((int)((int)DEFAULT_PROCESSING_TIME.toMillis()));
        queue2.addMultiple(Arrays.asList(item2, item3)).join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)2L, (long)stats.totalPending());
        Assert.assertEquals((long)0L, (long)stats.totalInProgress());
        Assert.assertEquals((long)1L, (long)stats.totalCompleted());
        CountDownLatch latch2 = new CountDownLatch(2);
        queue1.registerTaskProcessor(s -> latch2.countDown(), 2, (Executor)executor);
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch2, (long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    @Test
    public void testDestroy() {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item = DEFAULT_PAYLOAD;
        queue1.addOne((Object)item).join();
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] task2 = DEFAULT_PAYLOAD;
        queue2.addOne((Object)task2).join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)2L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)0L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
        queue2.destroy().join();
        stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)0L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)0L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
    }

    @Test
    public void testCompleteAttemptWithIncorrectSession() {
        String queueName = UUID.randomUUID().toString();
        AtomixClient atomix1 = this.createAtomixClient();
        AtomixWorkQueue queue1 = (AtomixWorkQueue)atomix1.getResource(queueName, AtomixWorkQueue.class).join();
        byte[] item = DEFAULT_PAYLOAD;
        queue1.addOne((Object)item).join();
        Task task = (Task)queue1.take().join();
        String taskId = task.taskId();
        AtomixClient atomix2 = this.createAtomixClient();
        AtomixWorkQueue queue2 = (AtomixWorkQueue)atomix2.getResource(queueName, AtomixWorkQueue.class).join();
        queue2.complete(new String[]{taskId}).join();
        WorkQueueStats stats = (WorkQueueStats)queue1.stats().join();
        Assert.assertEquals((long)stats.totalPending(), (long)0L);
        Assert.assertEquals((long)stats.totalInProgress(), (long)1L);
        Assert.assertEquals((long)stats.totalCompleted(), (long)0L);
    }
}

