/*
 * Decompiled with CFR 0.152.
 */
package alluxio.server.ft;

import alluxio.AlluxioURI;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.WriteType;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.options.GetWorkerOptions;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.FileSystemTestUtils;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.WritePType;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.io.BufferUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

@Ignore
@DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="bowen", comment="fix the tests")
public final class MultiWorkerIntegrationTest
extends BaseIntegrationTest {
    private static final int NUM_WORKERS = 4;
    private static final int WORKER_MEMORY_SIZE_BYTES = 0x100000;
    private static final int BLOCK_SIZE_BYTES = 524288;
    @Rule
    public LocalAlluxioClusterResource mResource = new LocalAlluxioClusterResource.Builder().setProperty(PropertyKey.WORKER_RAMDISK_SIZE, 0x100000).setProperty(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, 524288).setProperty(PropertyKey.USER_FILE_BUFFER_BYTES, 524288).setNumWorkers(4).build();

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.user.block.write.location.policy.class", "alluxio.client.block.policy.RoundRobinPolicy"})
    public void writeLargeFile() throws Exception {
        int fileSize = 0x400000;
        AlluxioURI file = new AlluxioURI("/test");
        FileSystem fs = this.mResource.get().getClient();
        FileSystemTestUtils.createByteFile((FileSystem)fs, (String)file.getPath(), (int)fileSize, (CreateFilePOptions)CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build());
        URIStatus status = fs.getStatus(file);
        Assert.assertEquals((long)100L, (long)status.getInAlluxioPercentage());
        try (FileInStream inStream = fs.openFile(file);){
            Assert.assertEquals((long)fileSize, (long)IOUtils.toByteArray((InputStream)inStream).length);
        }
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.user.block.size.bytes.default", "16MB", "alluxio.user.streaming.reader.chunk.size.bytes", "64KB", "alluxio.user.block.read.retry.max.duration", "1s", "alluxio.worker.ramdisk.size", "1GB"})
    public void readRecoverFromLostWorker() throws Exception {
        int offset = 0x1100000;
        int length = 0x2100000;
        int total = offset + length;
        AlluxioURI filePath = new AlluxioURI("/test");
        this.createFileOnWorker(total, filePath, this.mResource.get().getWorkerAddress());
        FileSystem fs = this.mResource.get().getClient();
        try (FileInStream in = fs.openFile(filePath, OpenFilePOptions.getDefaultInstance());){
            byte[] buf = new byte[total];
            int size = in.read(buf, 0, offset);
            this.replicateFileBlocks(filePath);
            this.mResource.get().getWorkerProcess().stop();
            Assert.assertEquals((long)total, (long)(size += in.read(buf, offset, length)));
            Assert.assertTrue((boolean)BufferUtils.equalIncreasingByteArray((int)offset, (int)size, (byte[])buf));
        }
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.user.block.size.bytes.default", "4MB", "alluxio.user.streaming.reader.chunk.size.bytes", "64KB", "alluxio.user.block.read.retry.max.duration", "1s", "alluxio.worker.ramdisk.size", "1GB"})
    public void readOneRecoverFromLostWorker() throws Exception {
        int offset = 0x100000;
        int length = 0x500000;
        int total = offset + length;
        AlluxioURI filePath = new AlluxioURI("/test");
        FileSystem fs = this.mResource.get().getClient();
        this.createFileOnWorker(total, filePath, this.mResource.get().getWorkerAddress());
        try (FileInStream in = fs.openFile(filePath, OpenFilePOptions.getDefaultInstance());){
            byte[] buf = new byte[total];
            int size = in.read(buf, 0, offset);
            this.replicateFileBlocks(filePath);
            this.mResource.get().getWorkerProcess().stop();
            for (int i = 0; i < length; ++i) {
                int result = in.read();
                Assert.assertEquals((long)result, (long)(i + size & 0xFF));
            }
        }
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.user.block.size.bytes.default", "4MB", "alluxio.user.streaming.reader.chunk.size.bytes", "64KB", "alluxio.user.block.read.retry.max.duration", "1s", "alluxio.worker.ramdisk.size", "1GB"})
    public void positionReadRecoverFromLostWorker() throws Exception {
        int offset = 0x100000;
        int length = 0x700000;
        int total = offset + length;
        AlluxioURI filePath = new AlluxioURI("/test");
        FileSystem fs = this.mResource.get().getClient();
        this.createFileOnWorker(total, filePath, this.mResource.get().getWorkerAddress());
        try (FileInStream in = fs.openFile(filePath, OpenFilePOptions.getDefaultInstance());){
            byte[] buf = new byte[length];
            this.replicateFileBlocks(filePath);
            this.mResource.get().getWorkerProcess().stop();
            int size = in.positionedRead((long)offset, buf, 0, length);
            Assert.assertEquals((long)length, (long)size);
            Assert.assertTrue((boolean)BufferUtils.equalIncreasingByteArray((int)offset, (int)size, (byte[])buf));
        }
    }

    private void createFileOnWorker(int total, AlluxioURI filePath, WorkerNetAddress address) throws IOException {
        FindFirstBlockLocationPolicy.sWorkerAddress = address;
        Class previousPolicy = Configuration.getClass((PropertyKey)PropertyKey.USER_BLOCK_WRITE_LOCATION_POLICY);
        Configuration.set((PropertyKey)PropertyKey.USER_BLOCK_WRITE_LOCATION_POLICY, (Object)FindFirstBlockLocationPolicy.class.getName());
        FileSystemTestUtils.createByteFile((FileSystem)this.mResource.get().getClient(), (AlluxioURI)filePath, (CreateFilePOptions)CreateFilePOptions.newBuilder().setWriteType(WritePType.MUST_CACHE).build(), (int)total);
        Configuration.set((PropertyKey)PropertyKey.USER_BLOCK_WRITE_LOCATION_POLICY, (Object)previousPolicy);
    }

    private void replicateFileBlocks(AlluxioURI filePath) throws Exception {
        FileSystemContext fsContext = FileSystemContext.create((AlluxioConfiguration)Configuration.global());
        BlockStoreClient store = BlockStoreClient.create((FileSystemContext)fsContext);
        URIStatus status = this.mResource.get().getClient().getStatus(filePath);
        List blocks = status.getFileBlockInfos();
        List workers = fsContext.getCachedWorkers();
        for (FileBlockInfo block : blocks) {
            BlockInfo blockInfo = block.getBlockInfo();
            WorkerNetAddress src = ((BlockLocation)blockInfo.getLocations().get(0)).getWorkerAddress();
            WorkerNetAddress dest = workers.stream().filter(candidate -> !candidate.getNetAddress().equals((Object)src)).findFirst().orElseThrow(() -> new IllegalStateException("Expected worker")).getNetAddress();
            BlockOutStream outStream = store.getOutStream(blockInfo.getBlockId(), blockInfo.getLength(), dest, OutStreamOptions.defaults((FileSystemContext)fsContext).setBlockSizeBytes(0x800000L).setWriteType(WriteType.MUST_CACHE));
            Throwable throwable = null;
            try {
                BlockInStream inStream = store.getInStream(blockInfo.getBlockId(), new InStreamOptions(status, Configuration.global()));
                Throwable throwable2 = null;
                try {
                    ByteStreams.copy((InputStream)inStream, (OutputStream)outStream);
                }
                catch (Throwable throwable3) {
                    throwable2 = throwable3;
                    throw throwable3;
                }
                finally {
                    if (inStream == null) continue;
                    if (throwable2 != null) {
                        try {
                            inStream.close();
                        }
                        catch (Throwable throwable4) {
                            throwable2.addSuppressed(throwable4);
                        }
                        continue;
                    }
                    inStream.close();
                }
            }
            catch (Throwable throwable5) {
                throwable = throwable5;
                throw throwable5;
            }
            finally {
                if (outStream == null) continue;
                if (throwable != null) {
                    try {
                        outStream.close();
                    }
                    catch (Throwable throwable6) {
                        throwable.addSuppressed(throwable6);
                    }
                    continue;
                }
                outStream.close();
            }
        }
    }

    public static class FindFirstBlockLocationPolicy
    implements BlockLocationPolicy {
        private static WorkerNetAddress sWorkerAddress;

        public FindFirstBlockLocationPolicy(AlluxioConfiguration ignored) {
        }

        public Optional<WorkerNetAddress> getWorker(GetWorkerOptions options) {
            return StreamSupport.stream(options.getBlockWorkerInfos().spliterator(), false).filter(x -> x.getNetAddress().equals((Object)sWorkerAddress)).findFirst().map(BlockWorkerInfo::getNetAddress);
        }
    }
}

