/*
 * Decompiled with CFR 0.152.
 */
package alluxio.multi.process;

import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
import alluxio.Configuration;
import alluxio.ConfigurationRule;
import alluxio.ConfigurationTestUtils;
import alluxio.PropertyKey;
import alluxio.cli.Format;
import alluxio.client.MetaMasterClient;
import alluxio.client.RetryHandlingMetaMasterClient;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.exception.status.UnavailableException;
import alluxio.master.MasterClientConfig;
import alluxio.master.MasterInquireClient;
import alluxio.master.SingleMasterInquireClient;
import alluxio.master.ZkMasterInquireClient;
import alluxio.multi.process.Master;
import alluxio.multi.process.MasterNetAddress;
import alluxio.multi.process.Worker;
import alluxio.network.PortUtils;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.PathUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.MasterInfo;
import alluxio.zookeeper.RestartableTestingServer;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class MultiProcessCluster
implements TestRule {
    private static final Logger LOG = LoggerFactory.getLogger(MultiProcessCluster.class);
    private static final File ARTIFACTS_DIR = new File("./target/artifacts");
    private static final File TESTS_LOG = new File("./target/logs/tests.log");
    private final Map<PropertyKey, String> mProperties;
    private final Map<Integer, Map<PropertyKey, String>> mMasterProperties;
    private final Map<Integer, Map<PropertyKey, String>> mWorkerProperties;
    private final int mNumMasters;
    private final int mNumWorkers;
    private final String mClusterName;
    private final Closer mCloser;
    private final List<Master> mMasters;
    private final List<Worker> mWorkers;
    private DeployMode mDeployMode;
    private File mWorkDir;
    private List<MasterNetAddress> mMasterAddresses;
    private State mState;
    private RestartableTestingServer mCuratorServer;
    private boolean mSuccess;

    private MultiProcessCluster(Map<PropertyKey, String> properties, Map<Integer, Map<PropertyKey, String>> masterProperties, Map<Integer, Map<PropertyKey, String>> workerProperties, int numMasters, int numWorkers, String clusterName, DeployMode mode) {
        this.mProperties = properties;
        this.mMasterProperties = masterProperties;
        this.mWorkerProperties = workerProperties;
        this.mNumMasters = numMasters;
        this.mNumWorkers = numWorkers;
        this.mClusterName = clusterName + ThreadLocalRandom.current().nextLong();
        this.mDeployMode = mode;
        this.mMasters = new ArrayList<Master>();
        this.mWorkers = new ArrayList<Worker>();
        this.mCloser = Closer.create();
        this.mState = State.NOT_STARTED;
        this.mSuccess = false;
    }

    public synchronized void start() throws Exception {
        int i;
        Preconditions.checkState((this.mState != State.STARTED ? 1 : 0) != 0, (Object)"Cannot start while already started");
        Preconditions.checkState((this.mState != State.DESTROYED ? 1 : 0) != 0, (Object)"Cannot start a destroyed cluster");
        this.mWorkDir = AlluxioTestDirectory.createTemporaryDirectory((String)this.mClusterName);
        this.mState = State.STARTED;
        this.mMasterAddresses = MultiProcessCluster.generateMasterAddresses(this.mNumMasters);
        LOG.info("Master addresses: {}", this.mMasterAddresses);
        switch (this.mDeployMode) {
            case NON_HA: {
                MasterNetAddress masterAddress = this.mMasterAddresses.get(0);
                this.mProperties.put(PropertyKey.MASTER_HOSTNAME, masterAddress.getHostname());
                this.mProperties.put(PropertyKey.MASTER_RPC_PORT, Integer.toString(masterAddress.getRpcPort()));
                this.mProperties.put(PropertyKey.MASTER_WEB_PORT, Integer.toString(masterAddress.getWebPort()));
                break;
            }
            case ZOOKEEPER_HA: {
                this.mCuratorServer = (RestartableTestingServer)((Object)this.mCloser.register((Closeable)((Object)new RestartableTestingServer(-1, AlluxioTestDirectory.createTemporaryDirectory((String)"zk")))));
                this.mProperties.put(PropertyKey.ZOOKEEPER_ENABLED, "true");
                this.mProperties.put(PropertyKey.ZOOKEEPER_ADDRESS, this.mCuratorServer.getConnectString());
                break;
            }
            default: {
                throw new IllegalStateException("Unknown deploy mode: " + this.mDeployMode.toString());
            }
        }
        for (Map.Entry entry : ConfigurationTestUtils.testConfigurationDefaults((String)NetworkAddressUtils.getLocalHostName(), (String)this.mWorkDir.getAbsolutePath()).entrySet()) {
            if (this.mProperties.containsKey(entry.getKey()) || ((PropertyKey)entry.getKey()).equals((Object)PropertyKey.USER_RPC_RETRY_MAX_DURATION)) continue;
            this.mProperties.put((PropertyKey)entry.getKey(), (String)entry.getValue());
        }
        this.mProperties.put(PropertyKey.MASTER_MOUNT_TABLE_ROOT_UFS, PathUtils.concatPath((Object)this.mWorkDir, (Object[])new Object[]{"underFSStorage"}));
        new File(this.mProperties.get(PropertyKey.MASTER_MOUNT_TABLE_ROOT_UFS)).mkdirs();
        this.formatJournal();
        this.writeConf();
        LOG.info("Starting alluxio cluster {} with base directory {}", (Object)this.mClusterName, (Object)this.mWorkDir.getAbsolutePath());
        for (i = 0; i < this.mNumMasters; ++i) {
            this.createMaster(i).start();
        }
        for (i = 0; i < this.mNumWorkers; ++i) {
            this.createWorker(i).start();
        }
    }

    public synchronized int waitForAndKillPrimaryMaster(int timeoutMs) {
        int primaryRpcPort;
        final FileSystem fs = this.getFileSystemClient();
        MasterInquireClient inquireClient = this.getMasterInquireClient();
        CommonUtils.waitFor((String)"a primary master to be serving", (Function)new Function<Void, Boolean>(){

            public Boolean apply(Void input) {
                try {
                    fs.getStatus(new AlluxioURI("/"));
                    return true;
                }
                catch (UnavailableException e) {
                    return false;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(timeoutMs));
        try {
            primaryRpcPort = inquireClient.getPrimaryRpcAddress().getPort();
        }
        catch (UnavailableException e) {
            throw new RuntimeException(e);
        }
        for (int i = 0; i < this.mMasterAddresses.size(); ++i) {
            if (this.mMasterAddresses.get(i).getRpcPort() != primaryRpcPort) continue;
            this.mMasters.get(i).close();
            return i;
        }
        throw new RuntimeException(String.format("No master found with RPC port %d. Master addresses: %s", primaryRpcPort, this.mMasterAddresses));
    }

    public synchronized void waitForAllNodesRegistered(int timeoutMs) {
        MetaMasterClient metaMasterClient = this.getMetaMasterClient();
        CommonUtils.waitFor((String)"all nodes registered", x -> {
            try {
                MasterInfo masterInfo = metaMasterClient.getMasterInfo(null);
                int liveNodeNum = masterInfo.getMasterAddresses().size() + masterInfo.getWorkerAddresses().size();
                return liveNodeNum == this.mNumMasters + this.mNumWorkers;
            }
            catch (UnavailableException e) {
                return false;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(timeoutMs));
    }

    public synchronized FileSystem getFileSystemClient() {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (String)"must be in the started state to get an fs client, but state was %s", (Object[])new Object[]{this.mState});
        MasterInquireClient inquireClient = this.getMasterInquireClient();
        return FileSystem.Factory.get((FileSystemContext)((FileSystemContext)this.mCloser.register((Closeable)FileSystemContext.create(null, (MasterInquireClient)inquireClient))));
    }

    public synchronized MetaMasterClient getMetaMasterClient() {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (String)"must be in the started state to get a meta master client, but state was %s", (Object[])new Object[]{this.mState});
        return new RetryHandlingMetaMasterClient(new MasterClientConfig().withMasterInquireClient(this.getMasterInquireClient()));
    }

    public synchronized void notifySuccess() {
        this.mSuccess = true;
    }

    public synchronized void saveWorkdir() throws IOException {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (Object)"cluster must be started before you can save its work directory");
        ARTIFACTS_DIR.mkdirs();
        File tarball = new File(this.mWorkDir.getParentFile(), this.mWorkDir.getName() + ".tar.gz");
        ProcessBuilder pb = new ProcessBuilder("tar", "-czf", tarball.getName(), this.mWorkDir.getName());
        pb.directory(this.mWorkDir.getParentFile());
        pb.redirectOutput(ProcessBuilder.Redirect.appendTo(TESTS_LOG));
        pb.redirectError(ProcessBuilder.Redirect.appendTo(TESTS_LOG));
        Process p = pb.start();
        try {
            p.waitFor();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        File finalTarball = new File(ARTIFACTS_DIR, tarball.getName());
        FileUtils.moveFile((File)tarball, (File)finalTarball);
        LOG.info("Saved cluster {} to {}", (Object)this.mClusterName, (Object)finalTarball.getAbsolutePath());
    }

    public synchronized void destroy() throws IOException {
        if (this.mState == State.DESTROYED) {
            return;
        }
        if (!this.mSuccess) {
            this.saveWorkdir();
        }
        this.mCloser.close();
        LOG.info("Destroyed cluster {}", (Object)this.mClusterName);
        this.mState = State.DESTROYED;
    }

    public synchronized void startMasters() {
        this.mMasters.forEach(master -> master.start());
    }

    public synchronized void startMaster(int i) throws IOException {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (Object)"Must be in a started state to start masters");
        this.mMasters.get(i).start();
    }

    public synchronized void startWorker(int i) throws IOException {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (Object)"Must be in a started state to start workers");
        this.mWorkers.get(i).start();
    }

    public synchronized void stopMasters() {
        this.mMasters.forEach(master -> master.close());
    }

    public synchronized void stopMaster(int i) throws IOException {
        this.mMasters.get(i).close();
    }

    public synchronized void updateMasterConf(PropertyKey key, @Nullable String value) {
        this.mMasters.forEach(master -> master.updateConf(key, value));
    }

    public synchronized void updateDeployMode(DeployMode mode) {
        this.mDeployMode = mode;
    }

    public synchronized void stopWorker(int i) throws IOException {
        this.mWorkers.get(i).close();
    }

    public synchronized List<MasterNetAddress> getMasterAddresses() {
        return this.mMasterAddresses;
    }

    public synchronized void stopZk() throws IOException {
        this.mCuratorServer.stop();
    }

    public synchronized void restartZk() throws Exception {
        Preconditions.checkNotNull((Object)((Object)this.mCuratorServer), (Object)"mCuratorServer");
        this.mCuratorServer.restart();
    }

    private synchronized Master createMaster(int i) throws IOException {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (Object)"Must be in a started state to create masters");
        MasterNetAddress address = this.mMasterAddresses.get(i);
        File confDir = new File(this.mWorkDir, "conf-master" + i);
        File logsDir = new File(this.mWorkDir, "logs-master" + i);
        logsDir.mkdirs();
        HashMap<PropertyKey, String> conf = new HashMap<PropertyKey, String>();
        conf.put(PropertyKey.LOGGER_TYPE, "MASTER_LOGGER");
        conf.put(PropertyKey.CONF_DIR, confDir.getAbsolutePath());
        conf.put(PropertyKey.LOGS_DIR, logsDir.getAbsolutePath());
        conf.put(PropertyKey.MASTER_HOSTNAME, address.getHostname());
        conf.put(PropertyKey.MASTER_RPC_PORT, Integer.toString(address.getRpcPort()));
        conf.put(PropertyKey.MASTER_WEB_PORT, Integer.toString(address.getWebPort()));
        Master master = (Master)this.mCloser.register((Closeable)new Master(logsDir, conf));
        this.mMasters.add(master);
        return master;
    }

    private synchronized Worker createWorker(int i) throws IOException {
        Preconditions.checkState((this.mState == State.STARTED ? 1 : 0) != 0, (Object)"Must be in a started state to create workers");
        File confDir = new File(this.mWorkDir, "conf-worker" + i);
        File logsDir = new File(this.mWorkDir, "logs-worker" + i);
        File ramdisk = new File(this.mWorkDir, "ramdisk" + i);
        logsDir.mkdirs();
        ramdisk.mkdirs();
        int rpcPort = PortUtils.getFreePort();
        int dataPort = PortUtils.getFreePort();
        int webPort = PortUtils.getFreePort();
        HashMap<PropertyKey, String> conf = new HashMap<PropertyKey, String>();
        conf.put(PropertyKey.LOGGER_TYPE, "WORKER_LOGGER");
        conf.put(PropertyKey.CONF_DIR, confDir.getAbsolutePath());
        conf.put(PropertyKey.Template.WORKER_TIERED_STORE_LEVEL_DIRS_PATH.format(new Object[]{0}), ramdisk.getAbsolutePath());
        conf.put(PropertyKey.LOGS_DIR, logsDir.getAbsolutePath());
        conf.put(PropertyKey.WORKER_RPC_PORT, Integer.toString(rpcPort));
        conf.put(PropertyKey.WORKER_DATA_PORT, Integer.toString(dataPort));
        conf.put(PropertyKey.WORKER_WEB_PORT, Integer.toString(webPort));
        Worker worker = (Worker)this.mCloser.register((Closeable)new Worker(logsDir, conf));
        this.mWorkers.add(worker);
        LOG.info("Created worker with (rpc, data, web) ports ({}, {}, {})", new Object[]{rpcPort, dataPort, webPort});
        return worker;
    }

    public synchronized void formatJournal() throws IOException {
        try (Closeable c = new ConfigurationRule(PropertyKey.MASTER_JOURNAL_FOLDER, this.mProperties.get(PropertyKey.MASTER_JOURNAL_FOLDER)).toResource();){
            Format.format((Format.Mode)Format.Mode.MASTER);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized MasterInquireClient getMasterInquireClient() {
        switch (this.mDeployMode) {
            case NON_HA: {
                Preconditions.checkState((this.mMasters.size() == 1 ? 1 : 0) != 0, (Object)"Running with multiple masters requires Zookeeper to be enabled");
                return new SingleMasterInquireClient(new InetSocketAddress(this.mMasterAddresses.get(0).getHostname(), this.mMasterAddresses.get(0).getRpcPort()));
            }
            case ZOOKEEPER_HA: {
                return ZkMasterInquireClient.getClient((String)this.mCuratorServer.getConnectString(), (String)Configuration.get((PropertyKey)PropertyKey.ZOOKEEPER_ELECTION_PATH), (String)Configuration.get((PropertyKey)PropertyKey.ZOOKEEPER_LEADER_PATH));
            }
        }
        throw new IllegalStateException("Unknown deploy mode: " + this.mDeployMode.toString());
    }

    private void writeConf() throws IOException {
        File confDir;
        int i;
        for (i = 0; i < this.mNumMasters; ++i) {
            confDir = new File(this.mWorkDir, "conf-master" + i);
            this.writeConfToFile(confDir, this.mMasterProperties.getOrDefault(i, new HashMap()));
        }
        for (i = 0; i < this.mNumWorkers; ++i) {
            confDir = new File(this.mWorkDir, "conf-worker" + i);
            this.writeConfToFile(confDir, this.mWorkerProperties.getOrDefault(i, new HashMap()));
        }
    }

    public Statement apply(final Statement base, Description description) {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    MultiProcessCluster.this.destroy();
                }
                catch (IOException e) {
                    LOG.warn("Failed to clean up test cluster processes: {}", (Object)e.toString());
                }
            }
        }));
        return new Statement(){

            public void evaluate() throws Throwable {
                try {
                    MultiProcessCluster.this.start();
                    base.evaluate();
                }
                finally {
                    try {
                        MultiProcessCluster.this.destroy();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to destroy cluster", t);
                    }
                }
            }
        };
    }

    private void writeConfToFile(File dir, Map<PropertyKey, String> properties) throws IOException {
        HashMap<PropertyKey, String> map = new HashMap<PropertyKey, String>(this.mProperties);
        for (Map.Entry<PropertyKey, String> entry : properties.entrySet()) {
            map.put(entry.getKey(), entry.getValue());
        }
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : map.entrySet()) {
            sb.append(String.format("%s=%s%n", entry.getKey(), entry.getValue()));
        }
        dir.mkdirs();
        Throwable throwable = null;
        try (FileOutputStream fileOutputStream = new FileOutputStream(new File(dir, "alluxio-site.properties"));){
            fileOutputStream.write(sb.toString().getBytes(Charsets.UTF_8));
        }
        catch (Throwable throwable2) {
            Throwable throwable3 = throwable2;
            throw throwable2;
        }
    }

    private static List<MasterNetAddress> generateMasterAddresses(int numMasters) throws IOException {
        ArrayList<MasterNetAddress> addrs = new ArrayList<MasterNetAddress>();
        for (int i = 0; i < numMasters; ++i) {
            addrs.add(new MasterNetAddress(NetworkAddressUtils.getLocalHostName(), PortUtils.getFreePort(), PortUtils.getFreePort()));
        }
        return addrs;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static final class Builder {
        private Map<PropertyKey, String> mProperties = new HashMap<PropertyKey, String>();
        private Map<Integer, Map<PropertyKey, String>> mMasterProperties = new HashMap<Integer, Map<PropertyKey, String>>();
        private Map<Integer, Map<PropertyKey, String>> mWorkerProperties = new HashMap<Integer, Map<PropertyKey, String>>();
        private int mNumMasters = 1;
        private int mNumWorkers = 1;
        private String mClusterName = "AlluxioMiniCluster";
        private DeployMode mDeployMode = DeployMode.NON_HA;

        private Builder() {
        }

        public Builder addProperty(PropertyKey key, String value) {
            Preconditions.checkState((!key.equals((Object)PropertyKey.ZOOKEEPER_ENABLED) ? 1 : 0) != 0, (Object)"Enable Zookeeper via #setDeployMode instead of #addProperty");
            this.mProperties.put(key, value);
            return this;
        }

        public Builder addProperties(Map<PropertyKey, String> properties) {
            for (Map.Entry<PropertyKey, String> entry : properties.entrySet()) {
                this.addProperty(entry.getKey(), entry.getValue());
            }
            return this;
        }

        public Builder setMasterProperties(Map<Integer, Map<PropertyKey, String>> properties) {
            this.mMasterProperties = properties;
            return this;
        }

        public Builder setWorkerProperties(Map<Integer, Map<PropertyKey, String>> properties) {
            this.mWorkerProperties = properties;
            return this;
        }

        public Builder setDeployMode(DeployMode mode) {
            this.mDeployMode = mode;
            return this;
        }

        public Builder setNumMasters(int numMasters) {
            this.mNumMasters = numMasters;
            return this;
        }

        public Builder setNumWorkers(int numWorkers) {
            this.mNumWorkers = numWorkers;
            return this;
        }

        public Builder setClusterName(String clusterName) {
            this.mClusterName = clusterName;
            return this;
        }

        public MultiProcessCluster build() {
            Preconditions.checkState((this.mMasterProperties.keySet().stream().filter(key -> key >= this.mNumMasters || key < 0).count() == 0L ? 1 : 0) != 0, (String)"The master indexes in master properties should be bigger or equal to zero and small than %s", (Object[])new Object[]{this.mNumMasters});
            Preconditions.checkState((this.mWorkerProperties.keySet().stream().filter(key -> key >= this.mNumWorkers || key < 0).count() == 0L ? 1 : 0) != 0, (String)"The worker indexes in worker properties should be bigger or equal to zero and small than %s", (Object[])new Object[]{this.mNumWorkers});
            return new MultiProcessCluster(this.mProperties, this.mMasterProperties, this.mWorkerProperties, this.mNumMasters, this.mNumWorkers, this.mClusterName, this.mDeployMode);
        }
    }

    public static enum DeployMode {
        NON_HA,
        ZOOKEEPER_HA;

    }

    private static enum State {
        NOT_STARTED,
        STARTED,
        DESTROYED;

    }
}

