/*
 * Decompiled with CFR 0.152.
 */
package alluxio.server.ft.journal.raft;

import alluxio.AlluxioURI;
import alluxio.client.file.FileSystem;
import alluxio.conf.PropertyKey;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.exception.FileDoesNotExistException;
import alluxio.grpc.MetricValue;
import alluxio.grpc.MountPOptions;
import alluxio.master.journal.JournalType;
import alluxio.master.journal.raft.RaftJournalSystem;
import alluxio.master.journal.raft.RaftJournalUtils;
import alluxio.master.journal.raft.SnapshotDirStateMachineStorage;
import alluxio.metrics.MetricKey;
import alluxio.multi.process.MultiProcessCluster;
import alluxio.multi.process.PortCoordination;
import alluxio.server.ft.journal.raft.EmbeddedJournalIntegrationTestBase;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.PathUtils;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageImpl;
import org.apache.ratis.server.storage.StorageImplUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@Ignore(value="In Dora, Client does not use Master/Journal services.")
public class EmbeddedJournalIntegrationTestFaultTolerance
extends EmbeddedJournalIntegrationTestBase {
    private static final int RESTART_TIMEOUT_MS = 360000;
    private static final int NUM_MASTERS = 3;
    private static final int NUM_WORKERS = 0;
    @Rule
    public TemporaryFolder mFolder = new TemporaryFolder();

    @Test
    public void failover() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_FAILOVER).setClusterName("EmbeddedJournalFaultTolerance_failover").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"750ms").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"1500ms").build();
        this.mCluster.start();
        AlluxioURI testDir = new AlluxioURI("/dir");
        FileSystem fs = this.mCluster.getFileSystemClient();
        fs.createDirectory(testDir);
        this.mCluster.waitForAndKillPrimaryMaster(5000);
        Assert.assertTrue((boolean)fs.exists(testDir));
        this.mCluster.notifySuccess();
    }

    @Test
    public void syncMetadataEJFailOver() throws Exception {
        int i;
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_FAILOVER_METADATA_SYNC).setClusterName("EmbeddedJournalFaultTolerance_syncMetadataFailOver").setNumMasters(3).setNumWorkers(1).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES, (Object)1000).addProperty(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX, (Object)"50KB").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"3s").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"6s").addProperty(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL, (Object)"5s").build();
        this.mCluster.start();
        this.mCluster.waitForAllNodesRegistered(30000);
        String ufsPath = this.mFolder.newFolder().getAbsoluteFile().toString();
        String ufsUri = "file://" + ufsPath;
        MountPOptions options = MountPOptions.newBuilder().build();
        FileSystem client = this.mCluster.getFileSystemClient();
        AlluxioURI mountPath = new AlluxioURI("/mnt1");
        client.mount(mountPath, new AlluxioURI(ufsUri), options);
        String fileName = "someFile";
        String contents = "contents";
        for (i = 0; i < 100; ++i) {
            try (FileWriter fw = new FileWriter(Paths.get(PathUtils.concatPath((Object)ufsPath, (Object)(fileName + i)), new String[0]).toString());){
                fw.write(contents + i);
                continue;
            }
        }
        for (i = 0; i < 100; ++i) {
            Assert.assertEquals((Object)(contents + i), (Object)IOUtils.toString((InputStream)client.openFile(mountPath.join(fileName + i)), (Charset)Charset.defaultCharset()));
        }
        this.mCluster.stopMasters();
        this.mCluster.startMasters();
        this.mCluster.waitForAllNodesRegistered(30000);
        client = this.mCluster.getFileSystemClient();
        for (i = 0; i < 100; ++i) {
            Assert.assertEquals((Object)(contents + i), (Object)IOUtils.toString((InputStream)client.openFile(mountPath.join(fileName + i)), (Charset)Charset.defaultCharset()));
        }
    }

    @Test
    public void copySnapshotToMaster() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_SNAPSHOT_MASTER).setClusterName("EmbeddedJournalFaultTolerance_copySnapshotToMaster").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES, (Object)1000).addProperty(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX, (Object)"50KB").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"3s").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"6s").addProperty(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL, (Object)"5s").build();
        this.mCluster.start();
        AlluxioURI testDir = new AlluxioURI("/dir");
        FileSystem fs = this.mCluster.getFileSystemClient();
        fs.createDirectory(testDir);
        for (int i = 0; i < 2000; ++i) {
            fs.createDirectory(testDir.join("file" + i));
        }
        int primaryMasterIndex = this.mCluster.getPrimaryMasterIndex(5000);
        String leaderJournalPath = this.mCluster.getJournalDir(primaryMasterIndex);
        File raftDir = new File(RaftJournalUtils.getRaftJournalDir((File)new File(leaderJournalPath)), RaftJournalSystem.RAFT_GROUP_UUID.toString());
        this.waitForSnapshot(raftDir);
        this.mCluster.stopMasters();
        RaftStorageImpl rs = StorageImplUtils.newRaftStorage((File)raftDir, (RaftServerConfigKeys.Log.CorruptionPolicy)RaftServerConfigKeys.Log.CorruptionPolicy.getDefault(), (RaftStorage.StartupOption)RaftStorage.StartupOption.RECOVER, (long)RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize());
        rs.initialize();
        SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage();
        storage.init((RaftStorage)rs);
        SnapshotInfo snapshot = storage.getLatestSnapshot();
        Assert.assertNotNull((Object)snapshot);
        this.mCluster.notifySuccess();
    }

    @Test
    public void copySnapshotToFollower() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_SNAPSHOT_FOLLOWER).setClusterName("EmbeddedJournalFaultTolerance_copySnapshotToFollower").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES, (Object)1000).addProperty(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX, (Object)"50KB").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"3s").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"6s").addProperty(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL, (Object)"5s").build();
        this.mCluster.start();
        int catchUpMasterIndex = (this.mCluster.getPrimaryMasterIndex(5000) + 1) % 3;
        AlluxioURI testDir = new AlluxioURI("/dir");
        FileSystem fs = this.mCluster.getFileSystemClient();
        fs.createDirectory(testDir);
        for (int i = 0; i < 2000; ++i) {
            fs.createDirectory(testDir.join("file" + i));
        }
        this.mCluster.getMetaMasterClient().checkpoint();
        this.mCluster.stopMaster(catchUpMasterIndex);
        File catchupJournalDir = new File(this.mCluster.getJournalDir(catchUpMasterIndex));
        FileUtils.deleteDirectory((File)catchupJournalDir);
        Assert.assertTrue((boolean)catchupJournalDir.mkdirs());
        this.mCluster.startMaster(catchUpMasterIndex);
        File raftDir = new File(RaftJournalUtils.getRaftJournalDir((File)catchupJournalDir), RaftJournalSystem.RAFT_GROUP_UUID.toString());
        this.waitForSnapshot(raftDir);
        this.mCluster.stopMaster(catchUpMasterIndex);
        RaftStorageImpl rs = StorageImplUtils.newRaftStorage((File)raftDir, (RaftServerConfigKeys.Log.CorruptionPolicy)RaftServerConfigKeys.Log.CorruptionPolicy.getDefault(), (RaftStorage.StartupOption)RaftStorage.StartupOption.RECOVER, (long)RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize());
        rs.initialize();
        SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage();
        storage.init((RaftStorage)rs);
        SnapshotInfo snapshot = storage.getLatestSnapshot();
        Assert.assertNotNull((Object)snapshot);
        this.mCluster.notifySuccess();
    }

    private void waitForSnapshot(File raftDir) throws InterruptedException, TimeoutException {
        File snapshotDir = new File(raftDir, "sm");
        int RETRY_INTERVAL_MS = 200;
        CommonUtils.waitFor((String)"snapshot is downloaded", () -> {
            String[] files = snapshotDir.list();
            return files != null && files.length > 0 && files[0].length() > 0;
        }, (WaitForOptions)WaitForOptions.defaults().setInterval(200).setTimeoutMs(360000L));
    }

    @Test
    public void snapshotTransferLoad() throws Exception {
        int numFile = 500;
        int snapshotPeriod = 50;
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_SNAPSHOT_TRANSFER_LOAD).setClusterName("EmbeddedJournalTransferLeadership_snapshotTransferLoad").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"750ms").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"1500ms").addProperty(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES, (Object)snapshotPeriod).addProperty(PropertyKey.MASTER_JOURNAL_REQUEST_INFO_TIMEOUT, (Object)"50ms").build();
        this.mCluster.start();
        for (int i = 0; i < numFile; ++i) {
            this.mCluster.getFileSystemClient().createFile(new AlluxioURI(String.format("/%d", i))).close();
        }
        Map metrics = this.mCluster.getMetricsMasterClient().getMetrics();
        Assert.assertTrue((boolean)metrics.containsKey(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_DOWNLOAD_TIMER.getName()));
        MetricValue metricValue = (MetricValue)metrics.get(MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_DOWNLOAD_TIMER.getName());
        long count = (long)metricValue.getDoubleValue();
        long expected = numFile / snapshotPeriod * 3 / 2;
        Assert.assertTrue((String)String.format("Expected at least %d snapshots, got %d", expected, count), (count >= expected ? 1 : 0) != 0);
        this.mCluster.notifySuccess();
    }

    @Test
    public void singleMasterSnapshotPurgeLogFiles() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_SNAPSHOT_SINGLE_MASTER).setClusterName("EmbeddedJournalTransferLeadership_singleMasterSnapshot").setNumMasters(1).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"750ms").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"1500ms").addProperty(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX, (Object)"1KB").build();
        this.mCluster.start();
        this.mCluster.waitForAllNodesRegistered(5000);
        File journalDir = new File(this.mCluster.getJournalDir(0));
        Path raftDir = Paths.get(RaftJournalUtils.getRaftJournalDir((File)journalDir).toString(), RaftJournalSystem.RAFT_GROUP_UUID.toString());
        this.expectSnapshots(raftDir, 0);
        this.expectLogFiles(raftDir, 1);
        this.mCluster.getFileSystemClient().createFile(new AlluxioURI("/testfile0"));
        this.mCluster.getFileSystemClient().createFile(new AlluxioURI("/testfile1"));
        this.expectSnapshots(raftDir, 0);
        this.expectLogFiles(raftDir, 2);
        this.mCluster.getMetaMasterClient().checkpoint();
        this.expectSnapshots(raftDir, 1);
        this.expectLogFiles(raftDir, 1);
        this.mCluster.notifySuccess();
    }

    private void expectSnapshots(Path raftDir, int numExpected) throws Exception {
        try (Stream<Path> stream = Files.walk(raftDir, Integer.MAX_VALUE, new FileVisitOption[0]);){
            long countSnapshots = stream.filter(path -> SnapshotDirStateMachineStorage.matchSnapshotPath((Path)path).matches()).count();
            Assert.assertEquals((String)("Expected " + numExpected + " snapshot(s) to be taken"), (long)numExpected, (long)countSnapshots);
        }
    }

    private void expectLogFiles(Path raftDir, int numExpected) throws Exception {
        try (Stream<Path> stream = Files.walk(raftDir, Integer.MAX_VALUE, new FileVisitOption[0]);){
            long countLogFiles = stream.filter(path -> path.getFileName().toString().startsWith("log_")).count();
            Assert.assertEquals((String)("Expected " + numExpected + " log file(s)"), (long)numExpected, (long)countLogFiles);
        }
    }

    @Test
    public void restart() throws Exception {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_RESTART).setClusterName("EmbeddedJournalFaultTolerance_restart").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"750ms").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"1500ms").build();
        this.mCluster.start();
        AlluxioURI testDir = new AlluxioURI("/dir");
        FileSystem fs = this.mCluster.getFileSystemClient();
        fs.createDirectory(testDir);
        this.restartMasters();
        Assert.assertTrue((boolean)fs.exists(testDir));
        this.restartMasters();
        Assert.assertTrue((boolean)fs.exists(testDir));
        this.restartMasters();
        Assert.assertTrue((boolean)fs.exists(testDir));
        this.mCluster.notifySuccess();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void restartStress() throws Throwable {
        this.mCluster = MultiProcessCluster.newBuilder((List)PortCoordination.EMBEDDED_JOURNAL_RESTART_STRESS).setClusterName("EmbeddedJournalFaultTolerance_restartStress").setNumMasters(3).setNumWorkers(0).addProperty(PropertyKey.MASTER_JOURNAL_TYPE, (Object)JournalType.EMBEDDED).addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, (Object)"5min").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, (Object)"750ms").addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, (Object)"1500ms").build();
        this.mCluster.start();
        AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
        AtomicInteger successes = new AtomicInteger(0);
        FileSystem fs = this.mCluster.getFileSystemClient();
        ArrayList<OperationThread> threads = new ArrayList<OperationThread>();
        try {
            int i;
            for (i = 0; i < 10; ++i) {
                OperationThread operationThread = new OperationThread(fs, i, failure, successes);
                operationThread.start();
                threads.add(operationThread);
            }
            for (i = 0; i < 2; ++i) {
                this.restartMasters();
                successes.set(0);
                CommonUtils.waitFor((String)"11 successes", () -> successes.get() >= 11, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(360000L));
                if (failure.get() == null) continue;
                throw failure.get();
            }
        }
        finally {
            threads.forEach(Thread::interrupt);
            for (Thread thread : threads) {
                thread.join();
            }
        }
        this.mCluster.notifySuccess();
    }

    private void restartMasters() throws Exception {
        int i;
        for (i = 0; i < 3; ++i) {
            this.mCluster.stopMaster(i);
        }
        for (i = 0; i < 3; ++i) {
            this.mCluster.startMaster(i);
        }
    }

    private static class OperationThread
    extends Thread {
        private final FileSystem mFs;
        private final int mThreadNum;
        private final AtomicReference<Throwable> mFailure;
        private final AtomicInteger mSuccessCounter;

        public OperationThread(FileSystem fs, int threadNum, AtomicReference<Throwable> failure, AtomicInteger successCounter) {
            super("operation-test-thread-" + threadNum);
            this.mFs = fs;
            this.mThreadNum = threadNum;
            this.mFailure = failure;
            this.mSuccessCounter = successCounter;
        }

        @Override
        public void run() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                e.printStackTrace();
                this.mFailure.set(e);
            }
        }

        public void runInternal() throws Exception {
            while (!Thread.interrupted()) {
                AlluxioURI dir;
                int i;
                int NUM_DIRS = 300;
                for (i = 0; i < 300; ++i) {
                    dir = this.formatDirName(i);
                    try {
                        this.mFs.createDirectory(dir);
                    }
                    catch (FileAlreadyExistsException e) {
                        continue;
                    }
                    if (this.mFs.exists(dir)) continue;
                    this.mFailure.set(new RuntimeException(String.format("Directory %s does not exist", dir)));
                    return;
                }
                for (i = 0; i < 300; ++i) {
                    dir = this.formatDirName(i);
                    try {
                        this.mFs.delete(dir);
                    }
                    catch (FileDoesNotExistException e) {
                        continue;
                    }
                    if (!this.mFs.exists(dir)) continue;
                    this.mFailure.set(new RuntimeException(String.format("Directory %s still exists", dir)));
                    return;
                }
                this.mSuccessCounter.incrementAndGet();
            }
        }

        private AlluxioURI formatDirName(int dirNum) {
            return new AlluxioURI(String.format("/dir-%d-%d", this.mThreadNum, dirNum));
        }
    }
}

