/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.streaming.object;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mule.runtime.api.streaming.exception.StreamingBufferSizeExceededException;
import org.mule.runtime.api.streaming.object.CursorIterator;
import org.mule.runtime.api.streaming.object.CursorIteratorProvider;
import org.mule.runtime.core.api.streaming.object.InMemoryCursorIteratorConfig;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.streaming.object.AbstractObjectStreamingTestCase;
import org.mule.runtime.core.internal.streaming.object.InMemoryCursorIteratorProvider;
import org.mule.tck.size.SmallTest;

@RunWith(value=Parameterized.class)
@SmallTest
@Feature(value="Streaming")
@Story(value="Object Streaming")
public class CursorIteratorProviderTestCase
extends AbstractObjectStreamingTestCase {
    protected static final int DATA_SIZE = 500;
    private final int halfDataLength;
    private final InMemoryCursorIteratorConfig config;
    protected final ScheduledExecutorService executorService;
    private CursorIteratorProvider streamProvider;
    private CountDownLatch controlLatch;
    private CountDownLatch mainThreadLatch;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"Doesn't require expansion", 500, 500, 0, 500}, {"Requires expansion", 500, 100, 50, 500});
    }

    public CursorIteratorProviderTestCase(String name, int dataSize, int initialBufferSize, int bufferSizeIncrement, int maxBufferSize) {
        super(dataSize);
        this.config = new InMemoryCursorIteratorConfig(initialBufferSize, bufferSizeIncrement, maxBufferSize);
        this.executorService = Executors.newScheduledThreadPool(2);
        this.halfDataLength = this.data.size() / 2;
        this.resetLatches();
    }

    @Before
    public void before() {
        this.streamProvider = this.createStreamProvider(this.data);
    }

    protected CursorIteratorProvider createStreamProvider(List<Object> data) {
        return new InMemoryCursorIteratorProvider(this.toStreamingIterator(data), this.config);
    }

    @After
    public void after() {
        this.streamProvider.close();
        this.executorService.shutdownNow();
    }

    @Test
    @Description(value="fully consume stream in a single thread")
    public void readFullyWithInSingleCursor() throws IOException {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> this.checkEquals(this.data, cursor)));
    }

    @Test
    @Description(value="Partially consume the stream, rewind back to zero and consume fully")
    public void rewindWhileStreamNotFullyConsumed() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> {
            List read = this.read(cursor, this.halfDataLength);
            this.checkEquals(read, this.data.subList(0, this.halfDataLength));
            cursor.seek(0L);
            read = this.read(cursor, this.data.size());
            this.checkEquals(read, this.data);
        }));
    }

    @Test
    @Description(value="Consume the stream, go back to two different positions and consume again (each)")
    public void randomSeekWithOneOpenCursor() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> {
            this.checkEquals(this.data, cursor);
            this.seekAndAssert((CursorIterator<Object>)cursor, 0L, 10);
            this.seekAndAssert((CursorIterator<Object>)cursor, this.halfDataLength, this.halfDataLength);
        }));
    }

    @Test
    @Description(value="Two open cursors consume the same stream, one after the other in the same thread")
    public void twoOpenCursorsConsumingTheStreamInSingleThread() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor2 -> {
            this.seekAndAssert((CursorIterator<Object>)cursor1, 0L, this.data.size());
            this.seekAndAssert((CursorIterator<Object>)cursor2, 0L, this.data.size());
        }))));
    }

    @Test
    @Description(value="Two open cursors consume different ends of the same stream, one after the other in the same thread")
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamInSingleThread() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor2 -> {
            this.seekAndAssert((CursorIterator<Object>)cursor1, 0L, this.data.size() / 2);
            this.seekAndAssert((CursorIterator<Object>)cursor2, this.halfDataLength, this.halfDataLength);
        }))));
    }

    @Test
    @Description(value="Two open cursors consume the same stream concurrently, each on its own thread")
    public void twoOpenCursorsConsumingTheStreamConcurrently() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor2 -> this.doAsync(() -> this.seekAndAssert((CursorIterator<Object>)cursor1, 0L, this.data.size()), () -> this.seekAndAssert((CursorIterator<Object>)cursor2, 0L, this.data.size()))))));
    }

    @Test
    @Description(value="Two open cursors consume different ends of same stream concurrently, each on its own thread")
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamConcurrently() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor2 -> this.doAsync(() -> this.seekAndAssert((CursorIterator<Object>)cursor1, 0L, this.data.size() / 2), () -> this.seekAndAssert((CursorIterator<Object>)cursor2, this.halfDataLength, this.halfDataLength))))));
    }

    @Test
    @Description(value="Seek different positions and verify that getPosition() is consistent")
    public void getPosition() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> {
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)0L));
            cursor.seek(10L);
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)10L));
            cursor.seek(0L);
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)0L));
        }));
    }

    @Test
    @Description(value="Get the size of a stream")
    public void size() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> Assert.assertThat((Object)cursor.getSize(), (Matcher)CoreMatchers.is((Object)this.data.size()))));
    }

    @Test(expected=StreamingBufferSizeExceededException.class)
    @Description(value="Exceed the maxBufferSize and expect exception")
    public void bufferSizeExceeded() throws Exception {
        this.data.add("I don't fit");
        this.streamProvider.close();
        this.streamProvider = this.createStreamProvider(this.data);
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> this.read(cursor, this.data.size())));
    }

    @Test
    @Description(value="Direct access to the last two items of the stream without traversing the whole cursor")
    public void getLastTwoItems() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> {
            int size = this.data.size();
            cursor.seek((long)(size - 2));
            Assert.assertThat((Object)cursor.hasNext(), (Matcher)CoreMatchers.is((Object)true));
            Assert.assertThat((Object)cursor.next(), (Matcher)CoreMatchers.is(this.data.get(size - 2)));
            Assert.assertThat((Object)cursor.next(), (Matcher)CoreMatchers.is(this.data.get(size - 1)));
            Assert.assertThat((Object)cursor.hasNext(), (Matcher)CoreMatchers.is((Object)false));
        }));
    }

    @Test(expected=NoSuchElementException.class)
    @Description(value="Move the cursor to a non existing position")
    public void outOfBoundsHasNext() throws Exception {
        this.withCursor((CheckedConsumer<CursorIterator>)((CheckedConsumer)cursor -> {
            cursor.seek((long)(this.data.size() + 100));
            Assert.assertThat((Object)cursor.hasNext(), (Matcher)CoreMatchers.is((Object)false));
            cursor.next();
        }));
    }

    private void doAsync(CheckedRunnable task1, CheckedRunnable task2) throws Exception {
        this.resetLatches();
        Future future1 = this.doAsync(() -> {
            this.controlLatch.await();
            task1.run();
            this.mainThreadLatch.countDown();
        });
        Future future2 = this.doAsync(() -> {
            this.controlLatch.countDown();
            task2.run();
            this.mainThreadLatch.countDown();
        });
        this.awaitMainThreadLatch();
        Assert.assertThat(future1.get(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        Assert.assertThat(future2.get(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
    }

    private Future doAsync(CheckedRunnable task) {
        return this.executorService.submit(() -> {
            try {
                task.run();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void awaitMainThreadLatch() throws InterruptedException {
        this.mainThreadLatch.await(1L, TimeUnit.SECONDS);
    }

    private void seekAndAssert(CursorIterator<Object> cursor, long position, int size) throws Exception {
        cursor.seek(position);
        List<Object> read = this.read(cursor, size);
        this.checkEquals(read, this.data.subList(Math.toIntExact(position), Math.toIntExact(position + (long)size)));
    }

    private void resetLatches() {
        this.controlLatch = new CountDownLatch(1);
        this.mainThreadLatch = new CountDownLatch(2);
    }

    private void withCursor(CheckedConsumer<CursorIterator> consumer) throws IOException {
        try (CursorIterator cursor = (CursorIterator)this.streamProvider.openCursor();){
            consumer.accept((Object)cursor);
        }
    }
}

