/*
 * Decompiled with CFR 0.152.
 */
package alluxio.server.ft;

import alluxio.ConfigurationRule;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.FileSystemMasterClientServiceGrpc;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcServerAddress;
import alluxio.grpc.ListStatusPRequest;
import alluxio.master.ZookeeperConnectionErrorPolicy;
import alluxio.master.journal.JournalType;
import alluxio.multi.process.MasterNetAddress;
import alluxio.multi.process.MultiProcessCluster;
import alluxio.multi.process.PortCoordination;
import alluxio.testutils.AlluxioOperationThread;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.util.CommonUtils;
import com.google.common.collect.ImmutableMap;
import io.grpc.Channel;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperFailureIntegrationTest
extends BaseIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperFailureIntegrationTest.class);
    @Rule
    public ConfigurationRule mConf = new ConfigurationRule((Map)ImmutableMap.of((Object)PropertyKey.USER_BLOCK_SIZE_BYTES_DEFAULT, (Object)"1000", (Object)PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS, (Object)"500", (Object)PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS, (Object)"500", (Object)PropertyKey.USER_RPC_RETRY_MAX_DURATION, (Object)"2500"), Configuration.modifiableGlobal());
    public MultiProcessCluster mCluster;

    @After
    public void after() throws Exception {
        if (this.mCluster != null) {
            this.mCluster.destroy();
        }
    }

    @Ignore(value="In Dora, Client does not use Master/Journal services.")
    @Test
    public void zkFailure() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.ZOOKEEPER_FAILURE).setClusterName("ZookeeperFailure").setNumMasters(2).setNumWorkers(1).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.UFS).build();
        this.mCluster.start();
        AlluxioOperationThread thread = new AlluxioOperationThread(this.mCluster.getFileSystemClient());
        thread.start();
        CommonUtils.waitFor((String)"a successful operation to be performed", () -> thread.successes() > 0);
        this.mCluster.stopZk();
        long zkStopTime = System.currentTimeMillis();
        AtomicInteger failureCounter = new AtomicInteger(3);
        AtomicReference<Object> lastFailure = new AtomicReference<Object>(null);
        CommonUtils.waitFor((String)"operations to start failing", () -> failureCounter.getAndAdd(lastFailure.getAndSet(thread.getLatestFailure()) != lastFailure.get() ? -1 : 0) <= 0);
        Assert.assertFalse((boolean)this.rpcServiceAvailable());
        LOG.info("First operation failed {}ms after stopping the Zookeeper cluster", (Object)(System.currentTimeMillis() - zkStopTime));
        long successes = thread.successes();
        this.mCluster.restartZk();
        long zkStartTime = System.currentTimeMillis();
        CommonUtils.waitFor((String)"another successful operation to be performed", () -> (long)thread.successes() > successes);
        thread.interrupt();
        thread.join();
        LOG.info("Recovered after {}ms", (Object)(System.currentTimeMillis() - zkStartTime));
        this.mCluster.notifySuccess();
    }

    @Ignore
    @Test
    public void zkConnectionPolicy_Standard() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.ZOOKEEPER_CONNECTION_POLICY_STANDARD).setClusterName("ZookeeperConnectionPolicy_Standard").setNumMasters(2).setNumWorkers(0).addProperty(PropertyKey.ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY, (Object)ZookeeperConnectionErrorPolicy.STANDARD).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.UFS).build();
        this.mCluster.start();
        int leaderIdx = this.getPrimaryMasterIndex();
        this.mCluster.restartZk();
        int leaderIdx2 = this.getPrimaryMasterIndex();
        Assert.assertNotEquals((long)leaderIdx, (long)leaderIdx2);
        this.mCluster.notifySuccess();
    }

    @Test
    public void zkConnectionPolicy_Session() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.ZOOKEEPER_CONNECTION_POLICY_SESSION).setClusterName("ZookeeperConnectionPolicy_Session").setNumMasters(2).setNumWorkers(0).addProperty(PropertyKey.ZOOKEEPER_LEADER_CONNECTION_ERROR_POLICY, (Object)ZookeeperConnectionErrorPolicy.SESSION).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.UFS).build();
        this.mCluster.start();
        int leaderIdx = this.getPrimaryMasterIndex();
        this.mCluster.restartZk();
        int leaderIdx2 = this.getPrimaryMasterIndex();
        Assert.assertEquals((long)leaderIdx, (long)leaderIdx2);
        this.mCluster.notifySuccess();
    }

    private int getPrimaryMasterIndex() throws Exception {
        AtomicInteger primaryIndex = new AtomicInteger();
        CommonUtils.waitFor((String)"Getting primary master index", () -> {
            try {
                primaryIndex.set(this.mCluster.getPrimaryMasterIndex(30000));
                return true;
            }
            catch (Exception e) {
                LOG.warn("Could not get primary master index.", (Throwable)e);
                return false;
            }
        });
        return primaryIndex.get();
    }

    private boolean rpcServiceAvailable() throws Exception {
        MasterNetAddress netAddress = (MasterNetAddress)this.mCluster.getMasterAddresses().get(0);
        InetSocketAddress address = new InetSocketAddress(netAddress.getHostname(), netAddress.getRpcPort());
        try {
            GrpcChannel channel = GrpcChannelBuilder.newBuilder((GrpcServerAddress)GrpcServerAddress.create((InetSocketAddress)address), (AlluxioConfiguration)Configuration.global()).build();
            FileSystemMasterClientServiceGrpc.FileSystemMasterClientServiceBlockingStub client = FileSystemMasterClientServiceGrpc.newBlockingStub((Channel)channel);
            client.listStatus(ListStatusPRequest.getDefaultInstance());
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }
}

