package com.addc.commons.queue;

import com.addc.test.AssertWaiter;
import com.addc.test.Asserter;
import javax.management.ObjectName;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:com/addc/commons/queue/PersistingQueueTest.class */
public class PersistingQueueTest {
    private static final String LINE_SEPARATOR;
    PersistingQueue<String> queue;
    private DefaultPersistingQueueStatistics<String> stats;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/addc/commons/queue/PersistingQueueTest$ReaderThread.class */
    private static class ReaderThread extends Thread {
        private int target;
        private int itemsRead;
        private final PersistingQueue<String> queue;

        ReaderThread(PersistingQueue<String> persistingQueue) {
            this.queue = persistingQueue;
        }

        public void setTarget(int i) {
            this.target = i;
            this.itemsRead = 0;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            do {
                if (((String) this.queue.take()) != null) {
                    this.itemsRead++;
                }
            } while (this.itemsRead != this.target);
        }

        public int getItemsRead() {
            return this.itemsRead;
        }
    }

    @BeforeClass
    public static void beforeClass() {
        System.setProperty("derby.stream.error.file", "target/derby.log");
    }

    @AfterClass
    public static void afterClass() {
        System.getProperties().remove("derby.stream.error.file");
    }

    @Before
    public void before() throws Exception {
        this.stats = new DefaultPersistingQueueStatistics<>(new ObjectName("addc:test=test"));
    }

    @After
    public void after() throws Exception {
        if (this.queue != null) {
            try {
                this.queue.clear();
            } catch (Exception e) {
                if (!$assertionsDisabled && e == null) {
                    throw new AssertionError();
                }
            }
            try {
                this.queue.shutdown((Object) null);
            } catch (Exception e2) {
                if (!$assertionsDisabled && e2 == null) {
                    throw new AssertionError();
                }
            }
        }
    }

    @Test
    public void checkPutWithoutPersistence() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        this.queue.addTransitionListener(new MockPersistingQueueListener());
        int i = 10 + 10;
        for (int i2 = 0; i2 < i; i2++) {
            this.queue.put("Hallo");
        }
        Assert.assertEquals(i, this.stats.getItemsCreated());
        Assert.assertEquals(0L, this.stats.getItemsWrittenToPersistence());
        Assert.assertEquals(10, this.stats.getItemsDropped());
        Assert.assertNull(this.queue.getPersistentQueue());
    }

    @Test
    public void checkPutAfterShutdown() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        this.queue.put("Hallo");
        this.queue.shutdown((Object) null);
        try {
            this.queue.put("Hallo");
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertEquals("Queue is shutdown", e.getMessage());
        }
    }

    @Test
    public void checkPullTakeAfterShutdown() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        this.queue.put("Hallo");
        this.queue.shutdown((Object) null);
        try {
            this.queue.take();
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertEquals("Queue is shutdown", e.getMessage());
        }
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        this.queue.put("Hallo");
        this.queue.shutdown("Hallo");
        try {
            this.queue.poll();
            Assert.fail();
        } catch (IllegalStateException e2) {
            Assert.assertEquals("Queue is shutdown", e2.getMessage());
        }
    }

    @Test
    public void checkInterruptTake() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        this.queue.put("Hallo");
        Assert.assertEquals("Hallo", this.queue.take());
        Thread thread = new Thread() { // from class: com.addc.commons.queue.PersistingQueueTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    sleep(300L);
                } catch (InterruptedException e) {
                }
                PersistingQueueTest.this.queue.interruptTake();
                try {
                    sleep(100L);
                } catch (InterruptedException e2) {
                }
                PersistingQueueTest.this.queue.put("Hallo");
            }
        };
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        thread.start();
        String str = (String) this.queue.take();
        Assert.assertTrue(System.currentTimeMillis() < currentTimeMillis);
        Assert.assertNull(str);
        AssertWaiter.retryAssert(10, 20L, new Asserter() { // from class: com.addc.commons.queue.PersistingQueueTest.2
            public void test() throws Exception {
                Assert.assertEquals("Hallo", PersistingQueueTest.this.queue.poll());
            }
        });
    }

    @Test
    public void checkTakeInterrupted() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        new Thread() { // from class: com.addc.commons.queue.PersistingQueueTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    sleep(200L);
                } catch (InterruptedException e) {
                }
                PersistingQueueTest.this.queue.put("Hallo");
            }
        }.start();
        long currentTimeMillis = System.currentTimeMillis();
        String str = (String) this.queue.take();
        long currentTimeMillis2 = System.currentTimeMillis();
        Assert.assertNotNull(str);
        Assert.assertEquals("Hallo", str);
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis < 210);
    }

    @Test
    public void checkTakeWithoutPersistence() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(false);
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, 10);
        for (int i = 0; i < 10; i++) {
            this.queue.put("Hallo");
        }
        Thread thread = new Thread() { // from class: com.addc.commons.queue.PersistingQueueTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                for (int i2 = 9; i2 >= 0; i2--) {
                    Assert.assertNotNull((String) PersistingQueueTest.this.queue.take());
                }
            }
        };
        thread.start();
        AssertWaiter.retryAssert(5, 30L, new Asserter() { // from class: com.addc.commons.queue.PersistingQueueTest.5
            public void test() throws Exception {
                Assert.assertTrue(PersistingQueueTest.this.queue.getPayloadBuffer().isEmpty());
            }
        });
        this.queue.interruptTake();
        thread.join();
        Assert.assertEquals(0L, this.stats.getItemsReadFromPersistence());
    }

    @Test
    public void checkWithPersistence() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(true);
        persistenceConfig.setDataBaseLocation("memory:");
        persistenceConfig.setDataBaseName("test");
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, true, 10, (ElementSerializer) null);
        this.queue.addListener(new MockPersistingQueueStateListener());
        int i = 10 + 10;
        Assert.assertNotNull(this.queue.getPersistentQueue());
        for (int i2 = 0; i2 < i; i2++) {
            this.queue.put("Hallo");
        }
        Assert.assertEquals(0L, this.queue.getPayloadBuffer().remainingCapacity());
        Assert.assertEquals(10, this.stats.getItemsWrittenToPersistence());
        ReaderThread readerThread = new ReaderThread(this.queue);
        readerThread.setTarget(i);
        readerThread.start();
        AssertWaiter.retryAssert(5, 30L, new Asserter() { // from class: com.addc.commons.queue.PersistingQueueTest.6
            public void test() throws Exception {
                Assert.assertTrue(PersistingQueueTest.this.queue.getPayloadBuffer().isEmpty());
            }
        });
        this.queue.interruptTake();
        readerThread.join();
        Assert.assertEquals(i, readerThread.getItemsRead());
        Assert.assertEquals("PersistingQueue statistics:" + LINE_SEPARATOR + "Elements received   : 20" + LINE_SEPARATOR + "Elements dropped    : 0" + LINE_SEPARATOR + "Elements to queue   : 10" + LINE_SEPARATOR + "Elements from queue : 10" + LINE_SEPARATOR, this.stats.getQueueStatistics());
    }

    @Test
    public void checkPersistenceWithStartup() throws Exception {
        PersistenceConfig persistenceConfig = new PersistenceConfig();
        persistenceConfig.setPersistent(true);
        persistenceConfig.setDataBaseLocation("target/data");
        persistenceConfig.setDataBaseName("test2");
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, false, 10, new MockElementSerializer());
        Assert.assertTrue(this.queue.isPersistent());
        int i = 10 + 10;
        for (int i2 = 0; i2 < i; i2++) {
            this.queue.put("Hallo");
        }
        Assert.assertEquals(0L, this.queue.getPayloadBuffer().remainingCapacity());
        Assert.assertEquals(10, this.stats.getItemsWrittenToPersistence());
        this.queue.shutdown((Object) null);
        Assert.assertTrue(this.queue.isShutdown());
        Assert.assertEquals(i, this.stats.getItemsWrittenToPersistence());
        this.queue = new PersistingQueue<>(persistenceConfig, this.stats, false, 10, new MockElementSerializer());
        Assert.assertTrue(this.queue.isPersistent());
        ReaderThread readerThread = new ReaderThread(this.queue);
        readerThread.setTarget(i);
        readerThread.start();
        AssertWaiter.retryAssert(10, 30L, new Asserter() { // from class: com.addc.commons.queue.PersistingQueueTest.7
            public void test() throws Exception {
                Assert.assertTrue(PersistingQueueTest.this.queue.getPayloadBuffer().isEmpty());
            }
        });
        this.queue.interruptTake();
        readerThread.join();
        Assert.assertEquals(i, readerThread.getItemsRead());
        Assert.assertEquals(i, this.stats.getItemsReadFromPersistence());
    }

    static {
        $assertionsDisabled = !PersistingQueueTest.class.desiredAssertionStatus();
        LINE_SEPARATOR = System.getProperty("line.separator");
    }
}
