/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.fs.io;

import alluxio.AlluxioURI;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.FileSystemMasterClient;
import alluxio.client.file.URIStatus;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.DecommissionWorkerPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.grpc.WritePType;
import alluxio.security.user.TestUserState;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.SleepUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.security.auth.Subject;
import org.apache.ratis.util.Preconditions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

@Ignore
@DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiacheng", comment="check if decommission is kept in dora")
public class FileOutStreamDecommissionIntegrationTest {
    private static final int BLOCK_SIZE = 0x100000;
    private static final int LENGTH = 0x200000;
    private static final int CLIENT_WORKER_LIST_REFRESH_INTERVAL = 2000;
    @Rule
    public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setNumWorkers(2).setProperty(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, 0x100000).setProperty(PropertyKey.USER_WORKER_LIST_REFRESH_INTERVAL, "2s").setProperty(PropertyKey.USER_FILE_WRITE_INIT_MAX_DURATION, "2s").setStartCluster(false).build();
    private FileSystem mFileSystem = null;
    private CreateFilePOptions mWriteBoth;
    private CreateFilePOptions mWriteAlluxio;
    private OpenFilePOptions mReadNoCache;
    private OpenFilePOptions mReadCachePromote;
    private String mTestPath;
    private ExecutorService mThreadPool;
    private String mCacheThroughFilePath;
    private String mMustCacheFilePath;
    @Rule
    public ExpectedException mThrown = ExpectedException.none();

    @Before
    public final void setUp() throws Exception {
        this.mLocalAlluxioClusterResource.start();
        this.mFileSystem = this.mLocalAlluxioClusterResource.get().getClient();
        WorkerNetAddress worker1 = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        this.mWriteBoth = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.CACHE_THROUGH).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)worker1)).setRecursive(true).build();
        this.mWriteAlluxio = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.MUST_CACHE).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)worker1)).setRecursive(true).build();
        this.mReadCachePromote = OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE_PROMOTE).build();
        this.mReadNoCache = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        this.mTestPath = PathUtils.uniqPath();
        this.mCacheThroughFilePath = this.mTestPath + "/file_BOTH";
        this.mMustCacheFilePath = this.mTestPath + "/file_CACHE";
        this.mThreadPool = Executors.newFixedThreadPool(1, ThreadFactoryUtils.build((String)"decommission-worker-%d", (boolean)true));
    }

    @After
    public final void tearDown() throws Exception {
        this.mLocalAlluxioClusterResource.stop();
        this.mThreadPool.shutdownNow();
    }

    private List<CreateFilePOptions> getOptionSet() {
        ArrayList<CreateFilePOptions> ret = new ArrayList<CreateFilePOptions>(2);
        ret.add(this.mWriteBoth);
        ret.add(this.mWriteAlluxio);
        return ret;
    }

    @Test
    public void writeUfsFromUndecommissionedWorker() throws Exception {
        int writeLength;
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        WorkerNetAddress workerToDecommission = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
        ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
        CreateFilePOptions writeOptions = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.CACHE_THROUGH).setRecursive(true).build();
        FileOutStream os = this.mFileSystem.createFile(uri, writeOptions);
        byte[] ret = new byte[0x100000];
        for (writeLength = 0; writeLength < 0x200000; writeLength += ret.length) {
            os.write(ret);
        }
        Assert.assertEquals((long)writeLength, (long)0x200000L);
        os.close();
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        Assert.assertEquals((long)0x200000L, (long)status.getLength());
        Assert.assertEquals((long)2L, (long)status.getFileBlockInfos().size());
        List block0Locs = ((FileBlockInfo)status.getFileBlockInfos().get(0)).getBlockInfo().getLocations();
        Assert.assertEquals((long)1L, (long)block0Locs.size());
        Assert.assertNotEquals((Object)workerToDecommission, (Object)((BlockLocation)block0Locs.get(0)).getWorkerAddress());
        List block1Locs = ((FileBlockInfo)status.getFileBlockInfos().get(1)).getBlockInfo().getLocations();
        Assert.assertEquals((long)1L, (long)block1Locs.size());
        Assert.assertNotEquals((Object)workerToDecommission, (Object)((BlockLocation)block1Locs.get(0)).getWorkerAddress());
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        int readLength = 0;
        int res = 0;
        while (res != -1) {
            res = is.read(ret);
            if (res == -1) continue;
            readLength += res;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
    }

    @Test
    public void cannotWriteFromDecommissionedWorker() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        WorkerNetAddress workerToDecommission = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
        ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
        CreateFilePOptions writeOptions = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.CACHE_THROUGH).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)workerToDecommission)).setRecursive(true).build();
        Exception e = (Exception)Assert.assertThrows(UnavailableException.class, () -> {
            FileOutStream os = this.mFileSystem.createFile(uri, writeOptions);
            os.write(7);
            os.close();
        });
        Preconditions.assertTrue((boolean)e.getMessage().contains(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0])));
    }

    @Test
    public void decommissionWhileWriting() throws Exception {
        int writeLength;
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        WorkerNetAddress workerToDecommission = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        CountDownLatch streamActive = new CountDownLatch(1);
        CountDownLatch workerDecommissioned = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                streamActive.await();
                DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
                ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
                List updatedWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
                Assert.assertEquals((long)1L, (long)updatedWorkers.size());
                workerDecommissioned.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        CreateFilePOptions writeOptions = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.CACHE_THROUGH).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)workerToDecommission)).setRecursive(true).build();
        FileOutStream os = this.mFileSystem.createFile(uri, writeOptions);
        byte[] ret = new byte[1024];
        boolean released = false;
        for (writeLength = 0; writeLength < 0x200000; writeLength += ret.length) {
            if (writeLength > 1024 && !released) {
                streamActive.countDown();
                released = true;
                workerDecommissioned.await();
            }
            os.write(ret);
        }
        Assert.assertEquals((long)writeLength, (long)0x200000L);
        os.close();
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        List blockInfos = status.getFileBlockInfos();
        FileBlockInfo block0 = (FileBlockInfo)blockInfos.get(0);
        Assert.assertEquals((long)0L, (long)block0.getBlockInfo().getLocations().size());
        FileBlockInfo block1 = (FileBlockInfo)blockInfos.get(1);
        Assert.assertEquals((long)0L, (long)block1.getBlockInfo().getLocations().size());
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        int readLength = 0;
        int res = 0;
        while (res != -1) {
            res = is.read(ret);
            if (res == -1) continue;
            readLength += res;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
    }

    @Test
    public void halfCacheThroughStreamDecommission() throws Exception {
        int writeLength;
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List clusterWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)clusterWorkers.size());
        WorkerNetAddress workerToDecommission = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        CountDownLatch streamActive = new CountDownLatch(1);
        CountDownLatch workerDecommissioned = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                streamActive.await();
                DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
                ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
                List updatedWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
                Assert.assertEquals((long)1L, (long)updatedWorkers.size());
                workerDecommissioned.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileOutStream os = this.mFileSystem.createFile(uri, this.mWriteBoth);
        byte[] ret = new byte[1024];
        boolean released = false;
        for (writeLength = 0; writeLength < 0x200000; writeLength += ret.length) {
            if (writeLength == 0x100000 && !released) {
                streamActive.countDown();
                released = true;
                workerDecommissioned.await();
                SleepUtils.sleepMs((long)2000L);
                List usableWorkers = context.getCachedWorkers();
                Assert.assertEquals((long)1L, (long)usableWorkers.size());
            }
            os.write(ret);
        }
        Assert.assertEquals((long)writeLength, (long)0x200000L);
        os.close();
        URIStatus statusAfterRead = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        Assert.assertEquals((long)0x200000L, (long)statusAfterRead.getLength());
        Assert.assertEquals((long)2L, (long)statusAfterRead.getFileBlockInfos().size());
        Assert.assertEquals((long)0L, (long)((FileBlockInfo)statusAfterRead.getFileBlockInfos().get(0)).getBlockInfo().getLocations().size());
        Assert.assertEquals((long)0L, (long)((FileBlockInfo)statusAfterRead.getFileBlockInfos().get(1)).getBlockInfo().getLocations().size());
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        int readLength = 0;
        int res = 0;
        while (res != -1) {
            res = is.read(ret);
            if (res == -1) continue;
            readLength += res;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
    }

    @Test
    public void halfStreamMustCacheDecommission() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mMustCacheFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List clusterWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)clusterWorkers.size());
        WorkerNetAddress workerToDecommission = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        CountDownLatch streamActive = new CountDownLatch(1);
        CountDownLatch workerDecommissioned = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                streamActive.await();
                DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
                ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
                List updatedWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
                Assert.assertEquals((long)1L, (long)updatedWorkers.size());
                workerDecommissioned.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileOutStream os = this.mFileSystem.createFile(uri, this.mWriteAlluxio);
        byte[] ret = new byte[1024];
        boolean released = false;
        for (int writeLength = 0; writeLength < 0x200000; writeLength += ret.length) {
            if (writeLength == 0x100000 && !released) {
                streamActive.countDown();
                released = true;
                workerDecommissioned.await();
                SleepUtils.sleepMs((long)2000L);
                List usableWorkers = context.getCachedWorkers();
                Assert.assertEquals((long)1L, (long)usableWorkers.size());
                Assert.assertThrows(IOException.class, () -> os.write(ret));
                break;
            }
            os.write(ret);
        }
        os.close();
        URIStatus statusAfterRead = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        Assert.assertEquals((long)0L, (long)statusAfterRead.getLength());
    }
}

