/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.streaming.bytes;

import io.qameta.allure.Feature;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mule.runtime.api.streaming.bytes.CursorStream;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.util.DataSize;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.streaming.bytes.ByteBufferManager;
import org.mule.runtime.core.api.streaming.bytes.InMemoryCursorStreamConfig;
import org.mule.runtime.core.api.streaming.bytes.InMemoryCursorStreamProvider;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.streaming.bytes.AbstractByteStreamingTestCase;
import org.mule.runtime.core.internal.streaming.bytes.PoolingByteBufferManager;
import org.mule.tck.size.SmallTest;

@RunWith(value=Parameterized.class)
@SmallTest
@Feature(value="Streaming")
public class CursorStreamProviderTestCase
extends AbstractByteStreamingTestCase {
    private final int halfDataLength;
    private final int bufferSize;
    private final int maxBufferSize;
    protected final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
    private CursorStreamProvider streamProvider;
    private CountDownLatch controlLatch;
    private CountDownLatch mainThreadLatch;
    private ExecutorService allocationScheduler;
    protected PoolingByteBufferManager bufferManager;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"Doesn't require expansion", 262144, 0x100000, 0x200000}, {"Requires expansion", 0x100000, 262144, 0x200000});
    }

    public CursorStreamProviderTestCase(String name, int dataSize, int bufferSize, int maxBufferSize) {
        super(dataSize);
        this.bufferSize = bufferSize;
        this.maxBufferSize = maxBufferSize;
        this.halfDataLength = this.data.length() / 2;
        this.resetLatches();
    }

    @Before
    public void before() {
        this.allocationScheduler = Executors.newSingleThreadExecutor();
        this.bufferManager = new PoolingByteBufferManager(this.allocationScheduler);
        ByteArrayInputStream dataStream = new ByteArrayInputStream(this.data.getBytes());
        this.streamProvider = this.createStreamProvider(this.bufferSize, this.maxBufferSize, dataStream);
    }

    protected CursorStreamProvider createStreamProvider(int bufferSize, int maxBufferSize, ByteArrayInputStream dataStream) {
        InMemoryCursorStreamConfig config = new InMemoryCursorStreamConfig(new DataSize(bufferSize, DataUnit.BYTE), new DataSize(bufferSize / 2, DataUnit.BYTE), new DataSize(maxBufferSize, DataUnit.BYTE));
        return new InMemoryCursorStreamProvider((InputStream)dataStream, config, (ByteBufferManager)this.bufferManager);
    }

    @After
    public void after() {
        this.streamProvider.close();
        this.executorService.shutdownNow();
        this.bufferManager.dispose();
        this.allocationScheduler.shutdownNow();
    }

    @Test
    public void readFullyWithInSingleCursor() throws IOException {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> Assert.assertThat((Object)IOUtils.toString((InputStream)cursor), (Matcher)CoreMatchers.equalTo((Object)this.data))));
    }

    @Test
    public void readFullyByteByByteWithSingleCursor() throws IOException {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            for (int i = 0; i < this.data.length(); ++i) {
                Assert.assertThat((Object)Character.valueOf((char)cursor.read()), (Matcher)CoreMatchers.equalTo((Object)Character.valueOf(this.data.charAt(i))));
            }
        }));
    }

    @Test
    public void partialReadOnSingleCursor() throws Exception {
        byte[] dest = new byte[this.halfDataLength];
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            cursor.read(dest, 0, this.halfDataLength);
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(0, this.halfDataLength)));
        }));
    }

    @Test
    public void rewindWhileStreamNotFullyConsumed() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            byte[] dest = new byte[this.halfDataLength];
            cursor.read(dest, 0, this.halfDataLength);
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(0, this.halfDataLength)));
            cursor.seek(0L);
            dest = new byte[this.data.length()];
            cursor.read(dest, 0, dest.length);
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data));
        }));
    }

    @Test
    public void partialReadWithOffsetOnSingleCursor() throws Exception {
        byte[] dest = new byte[this.halfDataLength + 2];
        dest[0] = "!".getBytes()[0];
        dest[1] = dest[0];
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            cursor.read(dest, 2, this.halfDataLength);
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)("!!" + this.data.substring(0, this.halfDataLength))));
        }));
    }

    @Test
    public void randomSeekWithOneOpenCursor() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            Assert.assertThat((Object)IOUtils.toString((InputStream)cursor), (Matcher)CoreMatchers.equalTo((Object)this.data));
            this.seekAndAssert((CursorStream)cursor, 0L, 10);
            this.seekAndAssert((CursorStream)cursor, this.halfDataLength, this.halfDataLength);
        }));
    }

    @Test
    public void twoOpenCursorsConsumingTheStreamInSingleThread() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor2 -> {
            this.seekAndAssert((CursorStream)cursor1, 0L, this.data.length());
            this.seekAndAssert((CursorStream)cursor2, 0L, this.data.length());
        }))));
    }

    @Test
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamInSingleThread() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor2 -> {
            this.seekAndAssert((CursorStream)cursor1, 0L, this.data.length() / 2);
            this.seekAndAssert((CursorStream)cursor2, this.halfDataLength, this.halfDataLength);
        }))));
    }

    @Test
    public void twoOpenCursorsConsumingTheStreamConcurrently() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor2 -> this.doAsync(() -> this.seekAndAssert((CursorStream)cursor1, 0L, this.data.length()), () -> this.seekAndAssert((CursorStream)cursor2, 0L, this.data.length()))))));
    }

    @Test
    public void twoOpenCursorsReadingOppositeEndsOfTheStreamConcurrently() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor1 -> this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor2 -> this.doAsync(() -> this.seekAndAssert((CursorStream)cursor1, 0L, this.data.length() / 2), () -> this.seekAndAssert((CursorStream)cursor2, this.halfDataLength, this.halfDataLength))))));
    }

    @Test
    public void getPosition() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)0L));
            cursor.seek(10L);
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)10L));
            cursor.seek(0L);
            Assert.assertThat((Object)cursor.getPosition(), (Matcher)CoreMatchers.is((Object)0L));
        }));
    }

    @Test
    public void consumeByChunksShorterThanBufferSize() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> Assert.assertThat((Object)this.readByChunks((InputStream)cursor, this.bufferSize / 2), (Matcher)CoreMatchers.equalTo((Object)this.data))));
    }

    @Test
    public void consumeByChunksWhichOverlapWithBuffer() throws Exception {
        StringBuilder accumulator = new StringBuilder();
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            int chunkSize = this.bufferSize / 2;
            byte[] buffer = new byte[chunkSize];
            int read = cursor.read(buffer, 0, chunkSize);
            this.append(buffer, read, accumulator);
            accumulator.append(this.readByChunks((InputStream)cursor, this.bufferSize));
        }));
        Assert.assertThat((Object)accumulator.toString(), (Matcher)CoreMatchers.equalTo((Object)this.data));
    }

    @Test
    public void getSliceWhichStartsBehindInCurrentSegmentButEndsInTheCurrent() throws Exception {
        if (this.data.length() < this.bufferSize) {
            return;
        }
        int len = this.bufferSize + 20;
        byte[] dest = new byte[len];
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            Assert.assertThat((Object)cursor.read(dest, 0, len), (Matcher)CoreMatchers.is((Object)len));
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(0, len)));
            int position = this.bufferSize - 30;
            cursor.seek((long)position);
            Assert.assertThat((Object)cursor.read(dest, 0, len), (Matcher)CoreMatchers.is((Object)len));
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(position, position + len)));
        }));
    }

    @Test
    public void getSliceWhichStartsInCurrentSegmentButEndsInTheNext() throws Exception {
        if (this.data.length() < this.bufferSize) {
            return;
        }
        int position = this.bufferSize - 10;
        int len = this.bufferSize / 2;
        byte[] dest = new byte[len];
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            cursor.seek((long)position);
            Assert.assertThat((Object)cursor.read(dest, 0, len), (Matcher)CoreMatchers.is((Object)len));
            Assert.assertThat((Object)this.toString(dest), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(position, position + len)));
        }));
    }

    @Test
    public void dataLengthMatchesMaxBufferSizeExactly() throws Exception {
        this.data = RandomStringUtils.randomAlphabetic((int)this.maxBufferSize);
        ByteArrayInputStream dataStream = new ByteArrayInputStream(this.data.getBytes());
        InMemoryCursorStreamConfig config = new InMemoryCursorStreamConfig(new DataSize(this.maxBufferSize, DataUnit.BYTE), new DataSize(0, DataUnit.BYTE), new DataSize(this.maxBufferSize, DataUnit.BYTE));
        this.streamProvider = new InMemoryCursorStreamProvider((InputStream)dataStream, config, (ByteBufferManager)this.bufferManager);
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> Assert.assertThat((Object)IOUtils.toString((InputStream)cursor), (Matcher)CoreMatchers.equalTo((Object)this.data))));
    }

    @Test
    public void mark() throws Exception {
        this.withCursor((CheckedConsumer<CursorStream>)((CheckedConsumer)cursor -> {
            int mark = 10;
            Assert.assertThat((Object)cursor.read(new byte[10], 0, 10), (Matcher)CoreMatchers.is((Object)10));
            long position = cursor.getPosition();
            cursor.mark(100);
            Assert.assertThat((Object)cursor.read(new byte[100], 0, 100), (Matcher)CoreMatchers.is((Object)100));
            cursor.reset();
            Assert.assertThat((Object)this.toString((InputStream)cursor), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(Math.toIntExact(position))));
        }));
    }

    @Test(expected=IOException.class)
    public void ioExceptionIfClosed() throws Exception {
        CursorStream cursor = (CursorStream)this.streamProvider.openCursor();
        cursor.close();
        cursor.read();
    }

    private void doAsync(CheckedRunnable task1, CheckedRunnable task2) throws Exception {
        this.resetLatches();
        Future future1 = this.doAsync(() -> {
            this.controlLatch.await();
            task1.run();
            this.mainThreadLatch.countDown();
        });
        Future future2 = this.doAsync(() -> {
            this.controlLatch.countDown();
            task2.run();
            this.mainThreadLatch.countDown();
        });
        this.awaitMainThreadLatch();
        Assert.assertThat(future1.get(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        Assert.assertThat(future2.get(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
    }

    private Future doAsync(CheckedRunnable task) {
        return this.executorService.submit(() -> {
            try {
                task.run();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void awaitMainThreadLatch() throws InterruptedException {
        this.mainThreadLatch.await(1L, TimeUnit.SECONDS);
    }

    private void seekAndAssert(CursorStream cursor, long position, int length) throws Exception {
        byte[] randomBytes = new byte[length];
        cursor.seek(position);
        cursor.read(randomBytes, 0, length);
        Assert.assertThat((Object)this.toString(randomBytes), (Matcher)CoreMatchers.equalTo((Object)this.data.substring(Math.toIntExact(position), Math.toIntExact(position + (long)length))));
    }

    private void resetLatches() {
        this.controlLatch = new CountDownLatch(1);
        this.mainThreadLatch = new CountDownLatch(2);
    }

    private void withCursor(CheckedConsumer<CursorStream> consumer) throws IOException {
        try (CursorStream cursor = (CursorStream)this.streamProvider.openCursor();){
            consumer.accept((Object)cursor);
        }
    }

    private String readByChunks(InputStream stream, int chunkSize) throws IOException {
        int read;
        StringBuilder accumulator = new StringBuilder();
        do {
            byte[] buffer = new byte[chunkSize];
            read = stream.read(buffer, 0, chunkSize);
            this.append(buffer, read, accumulator);
        } while (read > 0);
        return accumulator.toString();
    }

    private void append(byte[] buffer, int read, StringBuilder accumulator) {
        for (int i = 0; i < read; ++i) {
            accumulator.append((char)buffer[i]);
        }
    }
}

