/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsScaleTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Test;

public class ITestAzureBlobFileSystemFlush
extends AbstractAbfsScaleTest {
    private static final int BASE_SIZE = 1024;
    private static final int ONE_THOUSAND = 1000;
    private static final int TEST_BUFFER_SIZE = 5120000;
    private static final int ONE_MB = 0x100000;
    private static final int FLUSH_TIMES = 200;
    private static final int THREAD_SLEEP_TIME = 1000;
    private static final int TEST_FILE_LENGTH = 0x800000;
    private static final int WAITING_TIME = 1000;

    @Test
    public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
        byte[] b;
        AzureBlobFileSystem fs = this.getFileSystem();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = fs.create(testFilePath);){
            b = new byte[5120000];
            new Random().nextBytes(b);
            for (int i = 0; i < 2; ++i) {
                stream.write(b);
                for (int j = 0; j < 200; ++j) {
                    stream.flush();
                    Thread.sleep(10L);
                }
            }
        }
        byte[] r = new byte[5120000];
        try (FSDataInputStream inputStream = fs.open(testFilePath, 0x400000);){
            while (inputStream.available() != 0) {
                int result = inputStream.read(r);
                ITestAzureBlobFileSystemFlush.assertNotEquals((String)"read returned -1", (long)-1L, (long)result);
                ITestAzureBlobFileSystemFlush.assertArrayEquals((String)"buffer read from stream", (byte[])r, (byte[])b);
            }
        }
    }

    @Test
    public void testAbfsOutputStreamSyncFlush() throws Exception {
        byte[] b;
        AzureBlobFileSystem fs = this.getFileSystem();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = fs.create(testFilePath);){
            b = new byte[5120000];
            new Random().nextBytes(b);
            stream.write(b);
            for (int i = 0; i < 200; ++i) {
                stream.hsync();
                stream.hflush();
                Thread.sleep(10L);
            }
        }
        byte[] r = new byte[5120000];
        try (FSDataInputStream inputStream = fs.open(testFilePath, 0x400000);){
            int result = inputStream.read(r);
            ITestAzureBlobFileSystemFlush.assertNotEquals((long)-1L, (long)result);
            ITestAzureBlobFileSystemFlush.assertArrayEquals((byte[])r, (byte[])b);
        }
    }

    @Test
    public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
        ExecutorService es;
        AzureBlobFileSystem fs = this.getFileSystem();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (final FSDataOutputStream stream = fs.create(testFilePath);){
            es = Executors.newFixedThreadPool(10);
            final byte[] b = new byte[5120000];
            new Random().nextBytes(b);
            ArrayList<Future<Void>> tasks = new ArrayList<Future<Void>>();
            for (int i = 0; i < 200; ++i) {
                Callable<Void> callable = new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        stream.write(b);
                        return null;
                    }
                };
                tasks.add(es.submit(callable));
            }
            boolean shouldStop = false;
            while (!shouldStop) {
                shouldStop = true;
                for (Future future : tasks) {
                    if (future.isDone()) continue;
                    stream.hsync();
                    shouldStop = false;
                    Thread.sleep(1000L);
                }
            }
            tasks.clear();
        }
        es.shutdownNow();
        FileStatus fileStatus = fs.getFileStatus(testFilePath);
        long expectedWrites = 1024000000L;
        ITestAzureBlobFileSystemFlush.assertEquals((String)("Wrong file length in " + testFilePath), (long)expectedWrites, (long)fileStatus.getLen());
    }

    @Test
    public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        ExecutorService es = Executors.newFixedThreadPool(10);
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (final FSDataOutputStream stream = fs.create(testFilePath);){
            final byte[] b = new byte[5120000];
            new Random().nextBytes(b);
            ArrayList<Future<Void>> tasks = new ArrayList<Future<Void>>();
            for (int i = 0; i < 200; ++i) {
                Callable<Void> callable = new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        stream.write(b);
                        return null;
                    }
                };
                tasks.add(es.submit(callable));
            }
            boolean shouldStop = false;
            while (!shouldStop) {
                shouldStop = true;
                for (Future future : tasks) {
                    if (future.isDone()) continue;
                    stream.flush();
                    shouldStop = false;
                }
            }
            Thread.sleep(1000L);
            tasks.clear();
        }
        es.shutdownNow();
        FileStatus fileStatus = fs.getFileStatus(testFilePath);
        ITestAzureBlobFileSystemFlush.assertEquals((long)1024000000L, (long)fileStatus.getLen());
    }

    @Test
    public void testFlushWithFlushEnabled() throws Exception {
        this.testFlush(true);
    }

    @Test
    public void testFlushWithFlushDisabled() throws Exception {
        this.testFlush(false);
    }

    private void testFlush(boolean flushEnabled) throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled);
        Path testFilePath = this.path(this.methodName.getMethodName());
        byte[] buffer = this.getRandomBytesArray();
        ITestAzureBlobFileSystemFlush.assertEquals((long)fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(), (long)buffer.length);
        try (FSDataOutputStream stream = fs.create(testFilePath);){
            stream.write(buffer);
            AbfsOutputStream abfsStream = (AbfsOutputStream)stream.getWrappedStream();
            abfsStream.waitForPendingUploads();
            stream.flush();
            this.validate((InputStream)fs.open(testFilePath), buffer, flushEnabled);
        }
    }

    @Test
    public void testHflushWithFlushEnabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        String fileName = UUID.randomUUID().toString();
        Path testFilePath = this.path(fileName);
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, true);){
            stream.hflush();
            this.validate((FileSystem)fs, testFilePath, buffer, true);
        }
    }

    @Test
    public void testHflushWithFlushDisabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, false);){
            stream.hflush();
            this.validate((FileSystem)fs, testFilePath, buffer, false);
        }
    }

    @Test
    public void testHsyncWithFlushEnabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, true);){
            stream.hsync();
            this.validate((FileSystem)fs, testFilePath, buffer, true);
        }
    }

    @Test
    public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, false);){
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("hflush"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("hsync"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("dropbehind"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("in:readahead"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("in:unbuffer"));
        }
    }

    @Test
    public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, true);){
            ITestAzureBlobFileSystemFlush.assertTrue((boolean)stream.hasCapability("hflush"));
            ITestAzureBlobFileSystemFlush.assertTrue((boolean)stream.hasCapability("hsync"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("dropbehind"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("in:readahead"));
            ITestAzureBlobFileSystemFlush.assertFalse((boolean)stream.hasCapability("in:unbuffer"));
        }
    }

    @Test
    public void testHsyncWithFlushDisabled() throws Exception {
        AzureBlobFileSystem fs = this.getFileSystem();
        byte[] buffer = this.getRandomBytesArray();
        Path testFilePath = this.path(this.methodName.getMethodName());
        try (FSDataOutputStream stream = this.getStreamAfterWrite(fs, testFilePath, buffer, false);){
            stream.hsync();
            this.validate((FileSystem)fs, testFilePath, buffer, false);
        }
    }

    private byte[] getRandomBytesArray() {
        byte[] b = new byte[0x800000];
        new Random().nextBytes(b);
        return b;
    }

    private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException {
        fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
        FSDataOutputStream stream = fs.create(path);
        stream.write(buffer);
        return stream;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void validate(InputStream stream, byte[] writeBuffer, boolean isEqual) throws IOException {
        try {
            byte[] readBuffer = new byte[writeBuffer.length];
            int numBytesRead = stream.read(readBuffer, 0, readBuffer.length);
            if (isEqual) {
                ITestAzureBlobFileSystemFlush.assertArrayEquals((String)"Bytes read do not match bytes written.", (byte[])writeBuffer, (byte[])readBuffer);
            } else {
                ITestAzureBlobFileSystemFlush.assertThat((String)"Bytes read unexpectedly match bytes written.", (Object)readBuffer, (Matcher)IsNot.not((Matcher)IsEqual.equalTo((Object)writeBuffer)));
            }
        }
        finally {
            stream.close();
        }
    }

    private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
        String filePath = path.toUri().toString();
        try (FSDataInputStream inputStream = fs.open(path);){
            byte[] readBuffer = new byte[0x800000];
            int numBytesRead = inputStream.read(readBuffer, 0, readBuffer.length);
            if (isEqual) {
                ITestAzureBlobFileSystemFlush.assertArrayEquals((String)String.format("Bytes read do not match bytes written to %1$s", filePath), (byte[])writeBuffer, (byte[])readBuffer);
            } else {
                ITestAzureBlobFileSystemFlush.assertThat((String)String.format("Bytes read unexpectedly match bytes written to %1$s", filePath), (Object)readBuffer, (Matcher)IsNot.not((Matcher)IsEqual.equalTo((Object)writeBuffer)));
            }
        }
    }
}

