package io.questdb.mp;

import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.Rnd;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/questdb/mp/ConcurrentTest.class */
public class ConcurrentTest {
    private static final Log LOG = LogFactory.getLog(ConcurrentTest.class);

    /* loaded from: input_file:io/questdb/mp/ConcurrentTest$BusyConsumer.class */
    private static class BusyConsumer extends Thread {
        private final Sequence sequence;
        private final int[] buf;
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final CountDownLatch latch;
        private volatile int finalIndex = 0;

        BusyConsumer(int i, Sequence sequence, RingQueue<Event> ringQueue, CyclicBarrier cyclicBarrier, CountDownLatch countDownLatch) {
            this.sequence = sequence;
            this.buf = new int[i];
            this.queue = ringQueue;
            this.barrier = cyclicBarrier;
            this.latch = countDownLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.barrier.await();
                int i = 0;
                while (true) {
                    long next = this.sequence.next();
                    if (next < 0) {
                        LockSupport.parkNanos(1L);
                    } else {
                        int i2 = ((Event) this.queue.get(next)).value;
                        this.sequence.done(next);
                        if (i2 == Integer.MIN_VALUE) {
                            this.finalIndex = i;
                            this.latch.countDown();
                            return;
                        } else {
                            int i3 = i;
                            i++;
                            this.buf[i3] = i2;
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:io/questdb/mp/ConcurrentTest$BusySubscriber.class */
    private static class BusySubscriber extends Thread {
        private final int[] buf = new int[20];
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final CountDownLatch latch;
        private final FanOut fanOut;
        private final Sequence publisher;

        BusySubscriber(RingQueue<Event> ringQueue, CyclicBarrier cyclicBarrier, CountDownLatch countDownLatch, FanOut fanOut, Sequence sequence) {
            this.queue = ringQueue;
            this.barrier = cyclicBarrier;
            this.latch = countDownLatch;
            this.fanOut = fanOut;
            this.publisher = sequence;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.barrier.await();
                Thread.sleep(10L);
                SCSequence sCSequence = new SCSequence(this.publisher.current());
                this.fanOut.and(sCSequence);
                int i = 0;
                while (i < this.buf.length) {
                    long next = sCSequence.next();
                    if (next < 0) {
                        LockSupport.parkNanos(1L);
                    } else {
                        int i2 = ((Event) this.queue.get(next)).value;
                        sCSequence.done(next);
                        if (i2 == Integer.MIN_VALUE) {
                            break;
                        }
                        int i3 = i;
                        i++;
                        this.buf[i3] = i2;
                    }
                }
                this.fanOut.remove(sCSequence);
                this.latch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:io/questdb/mp/ConcurrentTest$WaitingConsumer.class */
    private static class WaitingConsumer extends Thread {
        private final Sequence sequence;
        private final int[] buf;
        private final RingQueue<Event> queue;
        private final CyclicBarrier barrier;
        private final SOCountDownLatch latch;
        private volatile int finalIndex = 0;

        WaitingConsumer(int i, Sequence sequence, RingQueue<Event> ringQueue, CyclicBarrier cyclicBarrier, SOCountDownLatch sOCountDownLatch) {
            this.sequence = sequence;
            this.buf = new int[i];
            this.queue = ringQueue;
            this.barrier = cyclicBarrier;
            this.latch = sOCountDownLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    this.barrier.await();
                    int i = 0;
                    while (true) {
                        long waitForNext = this.sequence.waitForNext();
                        int i2 = ((Event) this.queue.get(waitForNext)).value;
                        this.sequence.done(waitForNext);
                        if (i2 == Integer.MIN_VALUE) {
                            this.finalIndex = i;
                            this.latch.countDown();
                            return;
                        } else {
                            int i3 = i;
                            i++;
                            this.buf[i3] = i2;
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                    this.latch.countDown();
                }
            } catch (Throwable th2) {
                this.latch.countDown();
                throw th2;
            }
        }
    }

    @Test
    public void testFanOutChain() {
        LOG.info().$("testFanOutChain").$();
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence();
        SCSequence sCSequence2 = new SCSequence();
        SCSequence sCSequence3 = new SCSequence();
        sPSequence.then(FanOut.to(FanOut.to(sCSequence3).and(new SCSequence())).and(sCSequence.then(sCSequence2))).then(sPSequence);
    }

    @Test
    public void testOneToManyBusy() throws Exception {
        LOG.info().$("testOneToManyBusy").$();
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        MCSequence mCSequence = new MCSequence(1024);
        sPSequence.then(mCSequence).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        BusyConsumer[] busyConsumerArr = {new BusyConsumer(i, mCSequence, ringQueue, cyclicBarrier, countDownLatch), new BusyConsumer(i, mCSequence, ringQueue, cyclicBarrier, countDownLatch)};
        busyConsumerArr[0].start();
        busyConsumerArr[1].start();
        cyclicBarrier.await();
        int i2 = 0;
        while (true) {
            long next = sPSequence.next();
            if (next >= 0) {
                int i3 = i2;
                i2++;
                ((Event) ringQueue.get(next)).value = i3;
                sPSequence.done(next);
                if (i2 == i) {
                    break;
                }
            }
        }
        publishEOE(ringQueue, sPSequence);
        publishEOE(ringQueue, sPSequence);
        countDownLatch.await();
        int[] iArr = new int[i];
        System.arraycopy(busyConsumerArr[0].buf, 0, iArr, 0, busyConsumerArr[0].finalIndex);
        System.arraycopy(busyConsumerArr[1].buf, 0, iArr, busyConsumerArr[0].finalIndex, busyConsumerArr[1].finalIndex);
        Arrays.sort(iArr);
        for (int i4 = 0; i4 < iArr.length; i4++) {
            Assert.assertEquals(i4, iArr[i4]);
        }
    }

    @Test
    public void testOneToManyWaiting() throws Exception {
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        MCSequence mCSequence = new MCSequence(1024, new YieldingWaitStrategy());
        sPSequence.then(mCSequence).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        SOCountDownLatch sOCountDownLatch = new SOCountDownLatch(2);
        WaitingConsumer[] waitingConsumerArr = {new WaitingConsumer(i, mCSequence, ringQueue, cyclicBarrier, sOCountDownLatch), new WaitingConsumer(i, mCSequence, ringQueue, cyclicBarrier, sOCountDownLatch)};
        waitingConsumerArr[0].start();
        waitingConsumerArr[1].start();
        cyclicBarrier.await();
        int i2 = 0;
        do {
            long nextBully = sPSequence.nextBully();
            int i3 = i2;
            i2++;
            ((Event) ringQueue.get(nextBully)).value = i3;
            sPSequence.done(nextBully);
        } while (i2 != i);
        publishEOE(ringQueue, sPSequence);
        publishEOE(ringQueue, sPSequence);
        sOCountDownLatch.await();
        int[] iArr = new int[i];
        System.arraycopy(waitingConsumerArr[0].buf, 0, iArr, 0, waitingConsumerArr[0].finalIndex);
        System.arraycopy(waitingConsumerArr[1].buf, 0, iArr, waitingConsumerArr[0].finalIndex, waitingConsumerArr[1].finalIndex);
        Arrays.sort(iArr);
        for (int i4 = 0; i4 < iArr.length; i4++) {
            Assert.assertEquals(i4, iArr[i4]);
        }
    }

    /* JADX WARN: Type inference failed for: r0v26, types: [long, io.questdb.mp.RingQueue] */
    @Test
    public void testOneToOneBatched() throws BrokenBarrierException, InterruptedException {
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence();
        sPSequence.then(sCSequence).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        new Thread(() -> {
            try {
                cyclicBarrier.await();
                Rnd rnd = new Rnd();
                int i = 0;
                while (i < 1048576) {
                    long next = sPSequence.next();
                    if (next > -1) {
                        long available = sPSequence.available();
                        while (next < available && i < 1048576) {
                            next++;
                            ((Event) ringQueue.get(ringQueue)).value = rnd.nextInt();
                            i++;
                        }
                        sPSequence.done(next - 1);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        cyclicBarrier.await();
        int i = 0;
        Rnd rnd = new Rnd();
        while (i < 1048576) {
            long next = sCSequence.next();
            if (next > -1) {
                long available = sCSequence.available();
                while (next < available) {
                    next++;
                    Assert.assertEquals((long) rnd.nextInt(), ((Event) r0.get(r2)).value);
                    i++;
                }
                sCSequence.done(available - 1);
            }
        }
    }

    @Test
    public void testOneToOneBusy() throws Exception {
        LOG.info().$("testOneToOneBusy").$();
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence(new YieldingWaitStrategy());
        sPSequence.then(sCSequence).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BusyConsumer busyConsumer = new BusyConsumer(i, sCSequence, ringQueue, cyclicBarrier, countDownLatch);
        busyConsumer.start();
        cyclicBarrier.await();
        int i2 = 0;
        while (true) {
            long next = sPSequence.next();
            if (next >= 0) {
                int i3 = i2;
                i2++;
                ((Event) ringQueue.get(next)).value = i3;
                sPSequence.done(next);
                if (i2 == i) {
                    break;
                }
            }
        }
        publishEOE(ringQueue, sPSequence);
        countDownLatch.await();
        int[] iArr = busyConsumer.buf;
        for (int i4 = 0; i4 < iArr.length; i4++) {
            Assert.assertEquals(i4, iArr[i4]);
        }
    }

    @Test
    public void testOneToOneWaiting() throws Exception {
        LOG.info().$("testOneToOneWaiting").$();
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence(new YieldingWaitStrategy());
        sPSequence.then(sCSequence).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        SOCountDownLatch sOCountDownLatch = new SOCountDownLatch(1);
        WaitingConsumer waitingConsumer = new WaitingConsumer(i, sCSequence, ringQueue, cyclicBarrier, sOCountDownLatch);
        waitingConsumer.start();
        cyclicBarrier.await();
        int i2 = 0;
        do {
            long nextBully = sPSequence.nextBully();
            int i3 = i2;
            i2++;
            ((Event) ringQueue.get(nextBully)).value = i3;
            sPSequence.done(nextBully);
        } while (i2 != i);
        publishEOE(ringQueue, sPSequence);
        sOCountDownLatch.await();
        int[] iArr = waitingConsumer.buf;
        for (int i4 = 0; i4 < iArr.length; i4++) {
            Assert.assertEquals(i4, iArr[i4]);
        }
    }

    @Test
    public void testOneToParallelMany() throws Exception {
        LOG.info().$("testOneToParallelMany").$();
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence();
        SCSequence sCSequence2 = new SCSequence();
        sPSequence.then(FanOut.to(sCSequence).and(sCSequence2)).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        BusyConsumer[] busyConsumerArr = {new BusyConsumer(i, sCSequence, ringQueue, cyclicBarrier, countDownLatch), new BusyConsumer(i, sCSequence2, ringQueue, cyclicBarrier, countDownLatch)};
        busyConsumerArr[0].start();
        busyConsumerArr[1].start();
        cyclicBarrier.await();
        int i2 = 0;
        while (true) {
            long next = sPSequence.next();
            if (next >= 0) {
                int i3 = i2;
                i2++;
                ((Event) ringQueue.get(next)).value = i3;
                sPSequence.done(next);
                if (i2 == i) {
                    break;
                }
            } else {
                LockSupport.parkNanos(1L);
            }
        }
        publishEOE(ringQueue, sPSequence);
        publishEOE(ringQueue, sPSequence);
        countDownLatch.await();
        for (int i4 = 0; i4 < 2; i4++) {
            for (int i5 = 0; i5 < busyConsumerArr[i4].buf.length; i5++) {
                Assert.assertEquals(i5, busyConsumerArr[i4].buf[i5]);
            }
        }
    }

    @Test
    public void testOneToParallelSubscriber() throws Exception {
        LOG.info().$("testOneToParallelSubscriber").$();
        int i = 1024 * 1024;
        RingQueue ringQueue = new RingQueue(Event.FACTORY, 1024);
        SPSequence sPSequence = new SPSequence(1024);
        SCSequence sCSequence = new SCSequence();
        SCSequence sCSequence2 = new SCSequence();
        FanOut and = FanOut.to(sCSequence).and(sCSequence2);
        sPSequence.then(and).then(sPSequence);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        BusyConsumer[] busyConsumerArr = {new BusyConsumer(i, sCSequence, ringQueue, cyclicBarrier, countDownLatch), new BusyConsumer(i, sCSequence2, ringQueue, cyclicBarrier, countDownLatch)};
        BusySubscriber busySubscriber = new BusySubscriber(ringQueue, cyclicBarrier, countDownLatch, and, sPSequence);
        busySubscriber.start();
        busyConsumerArr[0].start();
        busyConsumerArr[1].start();
        cyclicBarrier.await();
        int i2 = 0;
        while (true) {
            long next = sPSequence.next();
            if (next >= 0) {
                int i3 = i2;
                i2++;
                ((Event) ringQueue.get(next)).value = i3;
                sPSequence.done(next);
                if (i2 == i) {
                    break;
                }
            }
        }
        publishEOE(ringQueue, sPSequence);
        publishEOE(ringQueue, sPSequence);
        countDownLatch.await();
        for (int i4 = 0; i4 < 2; i4++) {
            for (int i5 = 0; i5 < busyConsumerArr[i4].buf.length; i5++) {
                Assert.assertEquals(i5, busyConsumerArr[i4].buf[i5]);
            }
        }
        for (int i6 = 0; i6 < busySubscriber.buf.length; i6++) {
            Assert.assertTrue(busySubscriber.buf[i6] > 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void publishEOE(RingQueue<Event> ringQueue, Sequence sequence) {
        long nextBully = sequence.nextBully();
        ((Event) ringQueue.get(nextBully)).value = Integer.MIN_VALUE;
        sequence.done(nextBully);
    }
}
