/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.fs.io;

import alluxio.AlluxioURI;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.block.BlockMasterClient;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.FileSystemMasterClient;
import alluxio.client.file.FileSystemTestUtils;
import alluxio.client.file.URIStatus;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.DecommissionWorkerPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.grpc.WritePType;
import alluxio.security.user.TestUserState;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.SleepUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.security.auth.Subject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

@Ignore
@DoraTestTodoItem(action=DoraTestTodoItem.Action.FIX, owner="jiacheng", comment="check if decommission is still a relevant feature")
public class FileInStreamDecommissionIntegrationTest {
    private static final int BLOCK_SIZE = 0x100000;
    private static final int LENGTH = 0x200000;
    private static final int CLIENT_WORKER_LIST_REFRESH_INTERVAL = 2000;
    @Rule
    public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setNumWorkers(2).setProperty(PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, 0x100000).setProperty(PropertyKey.USER_WORKER_LIST_REFRESH_INTERVAL, "2s").setStartCluster(false).build();
    private FileSystem mFileSystem = null;
    private CreateFilePOptions mWriteBoth;
    private CreateFilePOptions mWriteAlluxio;
    private OpenFilePOptions mReadNoCache;
    private OpenFilePOptions mReadCachePromote;
    private String mTestPath;
    private ExecutorService mThreadPool;
    private String mCacheThroughFilePath;
    private String mMustCacheFilePath;
    @Rule
    public ExpectedException mThrown = ExpectedException.none();

    @Before
    public final void setUp() throws Exception {
        this.mLocalAlluxioClusterResource.start();
        this.mFileSystem = this.mLocalAlluxioClusterResource.get().getClient();
        WorkerNetAddress worker1 = this.mLocalAlluxioClusterResource.get().getWorkerAddress();
        this.mWriteBoth = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.CACHE_THROUGH).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)worker1)).setRecursive(true).build();
        this.mWriteAlluxio = CreateFilePOptions.newBuilder().setBlockSizeBytes(0x100000L).setWriteType(WritePType.MUST_CACHE).setWorkerLocation(GrpcUtils.toProto((WorkerNetAddress)worker1)).setRecursive(true).build();
        this.mReadCachePromote = OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE_PROMOTE).build();
        this.mReadNoCache = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        this.mTestPath = PathUtils.uniqPath();
        this.mCacheThroughFilePath = this.mTestPath + "/file_BOTH";
        this.mMustCacheFilePath = this.mTestPath + "/file_CACHE";
        AlluxioURI path0 = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemTestUtils.createByteFile((FileSystem)this.mFileSystem, (AlluxioURI)path0, (CreateFilePOptions)this.mWriteBoth, (int)0x200000);
        AlluxioURI path1 = new AlluxioURI(this.mMustCacheFilePath);
        FileSystemTestUtils.createByteFile((FileSystem)this.mFileSystem, (AlluxioURI)path1, (CreateFilePOptions)this.mWriteAlluxio, (int)0x200000);
        this.mThreadPool = Executors.newFixedThreadPool(1, ThreadFactoryUtils.build((String)"decommission-worker-%d", (boolean)true));
    }

    @After
    public final void tearDown() throws Exception {
        this.mLocalAlluxioClusterResource.stop();
        this.mThreadPool.shutdownNow();
    }

    private List<CreateFilePOptions> getOptionSet() {
        ArrayList<CreateFilePOptions> ret = new ArrayList<CreateFilePOptions>(2);
        ret.add(this.mWriteBoth);
        ret.add(this.mWriteAlluxio);
        return ret;
    }

    @Test
    public void readUfsFromUndecommissionedWorker() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        List blockInfos = status.getFileBlockInfos();
        FileBlockInfo block0 = (FileBlockInfo)blockInfos.get(0);
        BlockLocation loc0 = (BlockLocation)block0.getBlockInfo().getLocations().get(0);
        WorkerNetAddress workerToDecommission = loc0.getWorkerAddress();
        DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
        ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        byte[] ret = new byte[0x100000];
        int readLength = 0;
        int value = 0;
        while (value != -1) {
            value = is.read(ret);
            if (value == -1) continue;
            readLength += value;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
        is.close();
        URIStatus statusAfterRead = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        Assert.assertEquals((long)2L, (long)statusAfterRead.getFileBlockInfos().size());
        List block0Locs = ((FileBlockInfo)statusAfterRead.getFileBlockInfos().get(0)).getBlockInfo().getLocations();
        Assert.assertEquals((long)1L, (long)block0Locs.size());
        Assert.assertNotEquals((Object)workerToDecommission, (Object)((BlockLocation)block0Locs.get(0)).getWorkerAddress());
        List block1Locs = ((FileBlockInfo)statusAfterRead.getFileBlockInfos().get(1)).getBlockInfo().getLocations();
        Assert.assertEquals((long)1L, (long)block1Locs.size());
        Assert.assertNotEquals((Object)workerToDecommission, (Object)((BlockLocation)block1Locs.get(0)).getWorkerAddress());
    }

    @Test
    public void cannotReadCacheFromDecommissionedWorker() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mMustCacheFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        List blockInfos = status.getFileBlockInfos();
        FileBlockInfo block0 = (FileBlockInfo)blockInfos.get(0);
        BlockLocation loc0 = (BlockLocation)block0.getBlockInfo().getLocations().get(0);
        WorkerNetAddress targetWorker = loc0.getWorkerAddress();
        DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(targetWorker.getHost()).setWorkerWebPort((long)targetWorker.getWebPort()).setCanRegisterAgain(true).build();
        ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        Assert.assertThrows(UnavailableException.class, () -> {
            int value = is.read();
        });
        is.close();
    }

    @Test
    public void decommissionWhileReading() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List availableWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)availableWorkers.size());
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        List blockInfos = status.getFileBlockInfos();
        FileBlockInfo block0 = (FileBlockInfo)blockInfos.get(0);
        BlockLocation loc0 = (BlockLocation)block0.getBlockInfo().getLocations().get(0);
        WorkerNetAddress targetWorker = loc0.getWorkerAddress();
        CountDownLatch streamActive = new CountDownLatch(1);
        CountDownLatch workerDecommissioned = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                streamActive.await();
                DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(targetWorker.getHost()).setWorkerWebPort((long)targetWorker.getWebPort()).setCanRegisterAgain(true).build();
                ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
                List updatedWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
                Assert.assertEquals((long)1L, (long)updatedWorkers.size());
                workerDecommissioned.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadNoCache);
        byte[] ret = new byte[1024];
        int value = 0;
        int readLength = 0;
        boolean released = false;
        while (value != -1) {
            if (readLength > 1024 && !released) {
                streamActive.countDown();
                released = true;
                workerDecommissioned.await();
            }
            if ((value = is.read(ret)) == -1) continue;
            readLength += value;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
        is.close();
    }

    @Test
    public void halfStreamFromAnotherWorker() throws Exception {
        AlluxioURI uri = new AlluxioURI(this.mCacheThroughFilePath);
        FileSystemContext context = FileSystemContext.create((Subject)new TestUserState("test", Configuration.global()).getSubject(), (AlluxioConfiguration)Configuration.global());
        List clusterWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
        Assert.assertEquals((long)2L, (long)clusterWorkers.size());
        URIStatus status = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        List blockInfos = status.getFileBlockInfos();
        FileBlockInfo block0 = (FileBlockInfo)blockInfos.get(0);
        BlockLocation loc0 = (BlockLocation)block0.getBlockInfo().getLocations().get(0);
        WorkerNetAddress workerToDecommission = loc0.getWorkerAddress();
        CountDownLatch streamActive = new CountDownLatch(1);
        CountDownLatch workerDecommissioned = new CountDownLatch(1);
        this.mThreadPool.submit(() -> {
            try {
                streamActive.await();
                DecommissionWorkerPOptions decomOptions = DecommissionWorkerPOptions.newBuilder().setWorkerHostname(workerToDecommission.getHost()).setWorkerWebPort((long)workerToDecommission.getWebPort()).setCanRegisterAgain(true).build();
                ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).decommissionWorker(decomOptions);
                List updatedWorkers = ((BlockMasterClient)context.acquireBlockMasterClientResource().get()).getWorkerInfoList();
                Assert.assertEquals((long)1L, (long)updatedWorkers.size());
                workerDecommissioned.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        FileInStream is = this.mFileSystem.openFile(uri, this.mReadCachePromote);
        byte[] ret = new byte[1024];
        int value = 0;
        int readLength = 0;
        boolean released = false;
        while (value != -1) {
            if (readLength == 0x100000 && !released) {
                streamActive.countDown();
                released = true;
                workerDecommissioned.await();
                SleepUtils.sleepMs((long)2000L);
                List usableWorkers = context.getCachedWorkers();
                Assert.assertEquals((long)1L, (long)usableWorkers.size());
            }
            if ((value = is.read(ret)) == -1) continue;
            readLength += value;
        }
        Assert.assertEquals((long)readLength, (long)0x200000L);
        URIStatus statusAfterRead = ((FileSystemMasterClient)context.acquireMasterClientResource().get()).getStatus(uri, GetStatusPOptions.getDefaultInstance());
        FileBlockInfo block1 = (FileBlockInfo)statusAfterRead.getFileBlockInfos().get(1);
        WorkerNetAddress cachedToWorker = ((BlockLocation)block1.getBlockInfo().getLocations().get(0)).getWorkerAddress();
        Assert.assertNotEquals((Object)cachedToWorker, (Object)workerToDecommission);
        is.close();
    }
}

