/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.master;

import alluxio.conf.PropertyKey;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.job.JobConfig;
import alluxio.job.SleepJobConfig;
import alluxio.job.plan.PlanDefinition;
import alluxio.job.plan.PlanDefinitionRegistryRule;
import alluxio.job.plan.SleepPlanDefinition;
import alluxio.job.util.JobTestUtils;
import alluxio.job.wire.JobInfo;
import alluxio.job.wire.JobWorkerHealth;
import alluxio.job.wire.Status;
import alluxio.job.workflow.composite.CompositeConfig;
import alluxio.master.LocalAlluxioJobCluster;
import alluxio.master.job.JobMaster;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerInfo;
import alluxio.worker.JobWorkerProcess;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public final class JobMasterIntegrationTest
extends BaseIntegrationTest {
    private static final long WORKER_TIMEOUT_MS = 2000L;
    private static final long LOST_WORKER_INTERVAL_MS = 2000L;
    private JobMaster mJobMaster;
    private JobWorkerProcess mJobWorker;
    private LocalAlluxioJobCluster mLocalAlluxioJobCluster;
    @Rule
    public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setProperty(PropertyKey.JOB_MASTER_WORKER_HEARTBEAT_INTERVAL, 20).setProperty(PropertyKey.JOB_MASTER_WORKER_TIMEOUT, 2000L).setProperty(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL, 2000L).build();
    @Rule
    public PlanDefinitionRegistryRule mJobRule = new PlanDefinitionRegistryRule(SleepJobConfig.class, (PlanDefinition)new SleepPlanDefinition());

    @Before
    public void before() throws Exception {
        this.mLocalAlluxioJobCluster = new LocalAlluxioJobCluster();
        this.mLocalAlluxioJobCluster.start();
        this.mJobMaster = this.mLocalAlluxioJobCluster.getMaster().getJobMaster();
        this.mJobWorker = this.mLocalAlluxioJobCluster.getWorker();
    }

    @After
    public void after() throws Exception {
        this.mLocalAlluxioJobCluster.stop();
    }

    @Test
    public void multipleTasksPerWorker() throws Exception {
        long jobId = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L, 2));
        JobInfo jobStatus = this.mJobMaster.getStatus(jobId);
        Assert.assertEquals((long)2L, (long)jobStatus.getChildren().size());
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId, Status.COMPLETED);
        jobStatus = this.mJobMaster.getStatus(jobId);
        Assert.assertEquals((long)2L, (long)jobStatus.getChildren().size());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.job.master.job.capacity", "1", "alluxio.job.master.finished.job.retention.time", "0"})
    public void flowControl() throws Exception {
        block2: for (int i = 0; i < 10; ++i) {
            while (true) {
                try {
                    this.mJobMaster.run((JobConfig)new SleepJobConfig(100L));
                    continue block2;
                }
                catch (ResourceExhaustedException e) {
                    CommonUtils.sleepMs((long)100L);
                    continue;
                }
                break;
            }
        }
    }

    @Test
    public void restartMasterAndLoseWorker() throws Exception {
        long jobId = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId, Status.COMPLETED);
        this.mJobMaster.stop();
        this.mJobMaster.start(Boolean.valueOf(true));
        CommonUtils.waitFor((String)"Worker to register with restarted job master", () -> !this.mJobMaster.getWorkerInfoList().isEmpty(), (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(10000L));
        this.mJobWorker.stop();
        CommonUtils.sleepMs((long)4000L);
        Assert.assertTrue((boolean)this.mJobMaster.getWorkerInfoList().isEmpty());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.job.master.lost.worker.interval", "10000000"})
    public void restartMasterAndReregisterWorker() throws Exception {
        long jobId = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId, Status.COMPLETED);
        this.mJobMaster.stop();
        this.mJobMaster.start(Boolean.valueOf(true));
        CommonUtils.waitFor((String)"Worker to register with restarted job master", () -> !this.mJobMaster.getWorkerInfoList().isEmpty(), (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(10000L));
        long firstWorkerId = ((WorkerInfo)this.mJobMaster.getWorkerInfoList().get(0)).getId();
        this.mLocalAlluxioJobCluster.restartWorker();
        CommonUtils.waitFor((String)"Restarted worker to register with job master", () -> {
            List workerInfo = this.mJobMaster.getWorkerInfoList();
            return !workerInfo.isEmpty() && ((WorkerInfo)workerInfo.get(0)).getId() != firstWorkerId;
        }, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(10000L));
        Assert.assertEquals((long)1L, (long)this.mJobMaster.getWorkerInfoList().size());
    }

    @Test
    public void getAllWorkerHealth() throws Exception {
        AtomicReference singleton = new AtomicReference();
        CommonUtils.waitFor((String)"allWorkerHealth", () -> {
            List allWorkerHealth = this.mJobMaster.getAllWorkerHealth();
            singleton.set(allWorkerHealth);
            return allWorkerHealth.size() == 1;
        });
        List allWorkerHealth = (List)singleton.get();
        JobWorkerHealth workerHealth = (JobWorkerHealth)allWorkerHealth.get(0);
        Assert.assertNotNull((Object)workerHealth.getHostname());
        Assert.assertEquals((long)3L, (long)workerHealth.getLoadAverage().size());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.job.master.job.capacity", "20"})
    public void stopJobWorkerTasks() throws Exception {
        long jobId0 = this.mJobMaster.run((JobConfig)new SleepJobConfig(5000L));
        long jobId1 = this.mJobMaster.run((JobConfig)new SleepJobConfig(5000L));
        long jobId2 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        long jobId3 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId2, Status.COMPLETED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId3, Status.COMPLETED);
        Assert.assertFalse((boolean)this.mJobMaster.getStatus(jobId1).getStatus().isFinished());
        Assert.assertFalse((boolean)this.mJobMaster.getStatus(jobId0).getStatus().isFinished());
        Assert.assertEquals((long)2L, (long)((JobWorkerHealth)this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
        this.mJobMaster.setTaskPoolSize(0);
        long jobId5 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        CommonUtils.sleepMs((long)300L);
        Assert.assertFalse((boolean)this.mJobMaster.getStatus(jobId5).getStatus().isFinished());
        Assert.assertEquals((long)0L, (long)((JobWorkerHealth)this.mJobMaster.getAllWorkerHealth().get(0)).getTaskPoolSize());
        Assert.assertEquals((long)2L, (long)((JobWorkerHealth)this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams={"alluxio.job.master.job.capacity", "20"})
    public void throttleJobWorkerTasks() throws Exception {
        this.mJobMaster.setTaskPoolSize(1);
        long jobId0 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId0, Sets.newHashSet((Object[])new Status[]{Status.RUNNING, Status.COMPLETED}));
        long jobId1 = this.mJobMaster.run((JobConfig)new SleepJobConfig(50000L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId1, Status.RUNNING);
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId0, Status.COMPLETED);
        long jobId2 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        long jobId3 = this.mJobMaster.run((JobConfig)new SleepJobConfig(1L));
        CommonUtils.sleepMs((long)300L);
        Assert.assertEquals((Object)Status.RUNNING, (Object)this.mJobMaster.getStatus(jobId1).getStatus());
        Assert.assertEquals((Object)Status.CREATED, (Object)this.mJobMaster.getStatus(jobId2).getStatus());
        Assert.assertEquals((Object)Status.CREATED, (Object)this.mJobMaster.getStatus(jobId3).getStatus());
        Assert.assertEquals((long)1L, (long)((JobWorkerHealth)this.mJobMaster.getAllWorkerHealth().get(0)).getTaskPoolSize());
        Assert.assertEquals((long)1L, (long)((JobWorkerHealth)this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
        this.mJobMaster.cancel(jobId1);
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId2, Status.COMPLETED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId3, Status.COMPLETED);
    }

    @Test
    public void cancel() throws Exception {
        SleepJobConfig childJob1 = new SleepJobConfig(50000L);
        SleepJobConfig childJob2 = new SleepJobConfig(45000L);
        SleepJobConfig childJob3 = new SleepJobConfig(40000L);
        CompositeConfig jobConfig = new CompositeConfig(Lists.newArrayList((Object[])new JobConfig[]{childJob1, childJob2, childJob3}), Boolean.valueOf(false));
        long jobId = this.mJobMaster.run((JobConfig)jobConfig);
        JobInfo status = this.mJobMaster.getStatus(jobId);
        List children = status.getChildren();
        Assert.assertEquals((long)3L, (long)children.size());
        long child0 = ((JobInfo)children.get(0)).getId();
        long child1 = ((JobInfo)children.get(1)).getId();
        long child2 = ((JobInfo)children.get(2)).getId();
        this.mJobMaster.cancel(jobId);
        JobTestUtils.waitForJobStatus(this.mJobMaster, jobId, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, child0, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, child1, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, child2, Status.CANCELED);
    }
}

