/*
 * Decompiled with CFR 0.152.
 */
package org.camunda.bpm.engine.test.concurrency;

import java.util.List;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.camunda.bpm.engine.ManagementService;
import org.camunda.bpm.engine.OptimisticLockingException;
import org.camunda.bpm.engine.RepositoryService;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.impl.ProcessEngineLogger;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.cmd.AcquireJobsCmd;
import org.camunda.bpm.engine.impl.cmd.ExecuteJobsCmd;
import org.camunda.bpm.engine.impl.cmd.SetJobDefinitionPriorityCmd;
import org.camunda.bpm.engine.impl.cmd.SuspendJobCmd;
import org.camunda.bpm.engine.impl.cmd.SuspendJobDefinitionCmd;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.interceptor.CommandExecutor;
import org.camunda.bpm.engine.impl.jobexecutor.AcquiredJobs;
import org.camunda.bpm.engine.impl.jobexecutor.ExecuteJobHelper;
import org.camunda.bpm.engine.impl.jobexecutor.JobExecutor;
import org.camunda.bpm.engine.impl.jobexecutor.JobFailureCollector;
import org.camunda.bpm.engine.impl.management.UpdateJobDefinitionSuspensionStateBuilderImpl;
import org.camunda.bpm.engine.impl.management.UpdateJobSuspensionStateBuilderImpl;
import org.camunda.bpm.engine.impl.persistence.entity.JobEntity;
import org.camunda.bpm.engine.impl.util.ClockUtil;
import org.camunda.bpm.engine.management.JobDefinition;
import org.camunda.bpm.engine.repository.ProcessDefinition;
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.concurrency.ControllableThread;
import org.camunda.bpm.engine.test.concurrency.ControlledCommand;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.bpm.model.bpmn.Bpmn;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;
import org.camunda.bpm.model.bpmn.builder.IntermediateCatchEventBuilder;
import org.camunda.bpm.model.bpmn.builder.ServiceTaskBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.slf4j.Logger;

public class ConcurrentJobExecutorTest {
    private static Logger LOG = ProcessEngineLogger.TEST_LOGGER.getLogger();
    protected ProvidedProcessEngineRule engineRule = new ProvidedProcessEngineRule();
    protected ProcessEngineTestRule testRule = new ProcessEngineTestRule(this.engineRule);
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.engineRule).around((TestRule)this.testRule);
    protected RuntimeService runtimeService;
    protected RepositoryService repositoryService;
    protected ManagementService managementService;
    protected ProcessEngineConfigurationImpl processEngineConfiguration;
    protected static ControllableThread activeThread;
    protected static final BpmnModelInstance SIMPLE_ASYNC_PROCESS;

    @Before
    public void initServices() {
        this.runtimeService = this.engineRule.getRuntimeService();
        this.repositoryService = this.engineRule.getRepositoryService();
        this.managementService = this.engineRule.getManagementService();
        this.processEngineConfiguration = this.engineRule.getProcessEngineConfiguration();
    }

    @After
    public void tearDown() {
        ClockUtil.reset();
        for (final Job job : this.managementService.createJobQuery().list()) {
            this.processEngineConfiguration.getCommandExecutorTxRequired().execute((Command)new Command<Void>(){

                public Void execute(CommandContext commandContext) {
                    ((JobEntity)job).delete();
                    return null;
                }
            });
        }
    }

    @Test
    public void testCompetingJobExecutionDeleteJobDuringExecution() {
        this.testRule.deploy(((ServiceTaskBuilder)((ServiceTaskBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().serviceTask("task").camundaAsyncBefore()).camundaExpression("${true}")).endEvent().done());
        this.runtimeService.startProcessInstanceByKey("process");
        Job currentJob = (Job)this.managementService.createJobQuery().singleResult();
        JobExecutionThread threadOne = new JobExecutionThread(currentJob.getId());
        threadOne.startAndWaitUntilControlIsReturned();
        this.managementService.deleteJob(currentJob.getId());
        LOG.debug("test thread notifies thread 1");
        threadOne.proceedAndWaitTillDone();
        Assert.assertTrue((boolean)(threadOne.exception instanceof OptimisticLockingException));
    }

    @Test
    public void shouldCompleteTimeoutRetryWhenTimeoutedJobCompletesInbetween() {
        this.testRule.deploy(((ServiceTaskBuilder)((ServiceTaskBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().serviceTask("task").camundaAsyncBefore()).camundaExpression("${true}")).endEvent().done());
        this.runtimeService.startProcessInstanceByKey("process");
        Job currentJob = (Job)this.managementService.createJobQuery().singleResult();
        JobExecutionThread threadOne = new JobExecutionThread(currentJob.getId());
        threadOne.startAndWaitUntilControlIsReturned();
        ClockUtil.offset((Long)((long)this.engineRule.getProcessEngineConfiguration().getJobExecutor().getLockTimeInMillis() + 10000L));
        JobAcquisitionThread acquisitionThread = new JobAcquisitionThread();
        acquisitionThread.startAndWaitUntilControlIsReturned();
        acquisitionThread.proceedAndWaitTillDone();
        JobExecutionThread threadTwo = new JobExecutionThread(currentJob.getId());
        threadTwo.startAndWaitUntilControlIsReturned();
        threadOne.proceedAndWaitTillDone();
        threadTwo.proceedAndWaitTillDone();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)threadOne.exception).isInstanceOf(OptimisticLockingException.class)).hasMessageContaining("DELETE MessageEntity").hasMessageContaining("Entity was updated by another transaction concurrently");
        Assertions.assertThat((Throwable)threadTwo.exception).isNull();
        Assertions.assertThat((long)this.managementService.createJobQuery().count()).isEqualTo(0L);
    }

    @Test
    @Deployment
    public void testCompetingJobExecutionDefaultRetryStrategy() {
        this.runtimeService.startProcessInstanceByKey("miParallelSubprocess");
        List currentJobs = this.managementService.createJobQuery().list();
        Assert.assertEquals((long)2L, (long)currentJobs.size());
        JobExecutionThread threadOne = new JobExecutionThread(((Job)currentJobs.get(0)).getId());
        threadOne.startAndWaitUntilControlIsReturned();
        JobExecutionThread threadTwo = new JobExecutionThread(((Job)currentJobs.get(1)).getId());
        threadTwo.startAndWaitUntilControlIsReturned();
        LOG.debug("test thread notifies thread 1");
        threadOne.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)threadOne.exception));
        LOG.debug("test thread notifies thread 2");
        threadTwo.proceedAndWaitTillDone();
        Assert.assertNotNull((Object)((Object)threadTwo.exception));
        Job remainingJob = (Job)this.managementService.createJobQuery().singleResult();
        Assert.assertEquals((long)((Job)currentJobs.get(1)).getRetries(), (long)remainingJob.getRetries());
        Assert.assertNotNull((Object)remainingJob.getExceptionMessage());
        JobEntity jobEntity = (JobEntity)remainingJob;
        Assert.assertNull((Object)jobEntity.getLockOwner());
        Assert.assertNull((Object)jobEntity.getLockExpirationTime());
    }

    @Test
    @Deployment
    public void testCompetingJobExecutionFoxRetryStrategy() {
        this.runtimeService.startProcessInstanceByKey("miParallelSubprocess");
        List currentJobs = this.managementService.createJobQuery().list();
        Assert.assertEquals((long)2L, (long)currentJobs.size());
        JobExecutionThread threadOne = new JobExecutionThread(((Job)currentJobs.get(0)).getId());
        threadOne.startAndWaitUntilControlIsReturned();
        JobExecutionThread threadTwo = new JobExecutionThread(((Job)currentJobs.get(1)).getId());
        threadTwo.startAndWaitUntilControlIsReturned();
        LOG.debug("test thread notifies thread 1");
        threadOne.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)threadOne.exception));
        LOG.debug("test thread notifies thread 2");
        threadTwo.proceedAndWaitTillDone();
        Assert.assertNotNull((Object)((Object)threadTwo.exception));
        Job remainingJob = (Job)this.managementService.createJobQuery().singleResult();
        Assert.assertEquals((long)5L, (long)remainingJob.getRetries());
        Assert.assertNotNull((Object)remainingJob.getExceptionMessage());
        JobEntity jobEntity = (JobEntity)remainingJob;
        Assert.assertNull((Object)jobEntity.getLockOwner());
        Assert.assertNotNull((Object)jobEntity.getDuedate());
    }

    @Test
    public void testCompletingJobExecutionSuspendDuringExecution() {
        this.testRule.deploy(SIMPLE_ASYNC_PROCESS);
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        Job job = (Job)this.managementService.createJobQuery().singleResult();
        JobExecutionThread executionthread = new JobExecutionThread(job.getId());
        executionthread.startAndWaitUntilControlIsReturned();
        JobSuspensionThread jobSuspensionThread = new JobSuspensionThread("simpleAsyncProcess");
        jobSuspensionThread.startAndWaitUntilControlIsReturned();
        jobSuspensionThread.proceedAndWaitTillDone();
        executionthread.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)jobSuspensionThread.exception));
        Assert.assertNotNull((Object)((Object)executionthread.exception));
        executionthread = new JobExecutionThread(job.getId());
        executionthread.startAndWaitUntilControlIsReturned();
        jobSuspensionThread = new JobSuspensionThread("simpleAsyncProcess");
        jobSuspensionThread.startAndWaitUntilControlIsReturned();
        executionthread.proceedAndWaitTillDone();
        jobSuspensionThread.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)jobSuspensionThread.exception));
        Assert.assertNull((Object)((Object)executionthread.exception));
    }

    @Test
    public void testCompletingSuspendJobDuringAcquisition() {
        this.testRule.deploy(SIMPLE_ASYNC_PROCESS);
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        JobAcquisitionThread acquisitionThread = new JobAcquisitionThread();
        acquisitionThread.startAndWaitUntilControlIsReturned();
        JobSuspensionThread jobSuspensionThread = new JobSuspensionThread("simpleAsyncProcess");
        jobSuspensionThread.startAndWaitUntilControlIsReturned();
        jobSuspensionThread.proceedAndWaitTillDone();
        acquisitionThread.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)jobSuspensionThread.exception));
        if (this.testRule.isOptimisticLockingExceptionSuppressible()) {
            Assert.assertNull((Object)((Object)acquisitionThread.exception));
            Assert.assertEquals((long)0L, (long)acquisitionThread.acquiredJobs.size());
        } else {
            Assertions.assertThat((Throwable)acquisitionThread.exception).isInstanceOf(OptimisticLockingException.class);
            Assert.assertNull((Object)acquisitionThread.acquiredJobs);
        }
        acquisitionThread = new JobAcquisitionThread();
        acquisitionThread.startAndWaitUntilControlIsReturned();
        jobSuspensionThread = new JobSuspensionThread("simpleAsyncProcess");
        jobSuspensionThread.startAndWaitUntilControlIsReturned();
        acquisitionThread.proceedAndWaitTillDone();
        jobSuspensionThread.proceedAndWaitTillDone();
        Assert.assertNull((Object)((Object)jobSuspensionThread.exception));
        Assert.assertNull((Object)((Object)acquisitionThread.exception));
    }

    @Test
    public void testCompletingSuspendedJobDuringRunningInstance() {
        this.testRule.deploy(((IntermediateCatchEventBuilder)Bpmn.createExecutableProcess((String)"process").startEvent().receiveTask().intermediateCatchEvent().timerWithDuration("PT0M")).endEvent().done());
        ProcessDefinition processDefinition = (ProcessDefinition)this.repositoryService.createProcessDefinitionQuery().singleResult();
        ProcessInstance processInstance = this.runtimeService.startProcessInstanceById(processDefinition.getId());
        this.repositoryService.suspendProcessDefinitionById(processDefinition.getId());
        Assert.assertEquals((long)1L, (long)this.runtimeService.createProcessInstanceQuery().active().count());
        this.runtimeService.signal(processInstance.getId());
        Assert.assertEquals((long)1L, (long)this.managementService.createJobQuery().suspended().count());
        Assert.assertEquals((long)0L, (long)this.managementService.createJobQuery().active().count());
        Assert.assertEquals((long)1L, (long)this.runtimeService.createProcessInstanceQuery().active().count());
    }

    @Test
    public void testCompletingUpdateJobDefinitionPriorityDuringExecution() {
        this.testRule.deploy(SIMPLE_ASYNC_PROCESS);
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        JobDefinition jobDefinition = (JobDefinition)this.managementService.createJobDefinitionQuery().singleResult();
        List jobs = this.managementService.createJobQuery().list();
        JobExecutionThread executionThread = new JobExecutionThread(((Job)jobs.get(0)).getId());
        executionThread.startAndWaitUntilControlIsReturned();
        JobDefinitionPriorityThread priorityThread = new JobDefinitionPriorityThread(jobDefinition.getId(), 42L, true);
        priorityThread.startAndWaitUntilControlIsReturned();
        priorityThread.proceedAndWaitTillDone();
        List currentJobs = this.managementService.createJobQuery().list();
        for (Job job : currentJobs) {
            Assert.assertEquals((long)42L, (long)job.getPriority());
        }
        executionThread.proceedAndWaitTillDone();
        long remainingJobCount = this.managementService.createJobQuery().count();
        if (this.testRule.isOptimisticLockingExceptionSuppressible()) {
            Assert.assertNull((Object)((Object)executionThread.exception));
            Assert.assertEquals((long)1L, (long)remainingJobCount);
        } else {
            Assertions.assertThat((Throwable)executionThread.exception).isInstanceOf(OptimisticLockingException.class);
            Assert.assertEquals((long)2L, (long)remainingJobCount);
        }
    }

    @Test
    public void testCompletingSuspensionJobDuringPriorityUpdate() {
        this.testRule.deploy(SIMPLE_ASYNC_PROCESS);
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        this.runtimeService.startProcessInstanceByKey("simpleAsyncProcess");
        JobDefinition jobDefinition = (JobDefinition)this.managementService.createJobDefinitionQuery().singleResult();
        JobSuspensionByJobDefinitionThread suspensionThread = new JobSuspensionByJobDefinitionThread(jobDefinition.getId());
        suspensionThread.startAndWaitUntilControlIsReturned();
        JobDefinitionPriorityThread priorityUpdateThread = new JobDefinitionPriorityThread(jobDefinition.getId(), 42L, true);
        priorityUpdateThread.startAndWaitUntilControlIsReturned();
        suspensionThread.proceedAndWaitTillDone();
        priorityUpdateThread.proceedAndWaitTillDone();
        List updatedJobs = this.managementService.createJobQuery().list();
        Assert.assertEquals((long)2L, (long)updatedJobs.size());
        for (Job job : updatedJobs) {
            Assert.assertEquals((long)42L, (long)job.getPriority());
            Assert.assertTrue((boolean)job.isSuspended());
        }
    }

    static {
        SIMPLE_ASYNC_PROCESS = ((ServiceTaskBuilder)((ServiceTaskBuilder)Bpmn.createExecutableProcess((String)"simpleAsyncProcess").startEvent().serviceTask().camundaExpression("${true}")).camundaAsyncBefore()).endEvent().done();
    }

    public class JobExecutionThread
    extends ControllableThread {
        OptimisticLockingException exception;
        String jobId;

        JobExecutionThread(String jobId) {
            this.jobId = jobId;
        }

        @Override
        public synchronized void startAndWaitUntilControlIsReturned() {
            activeThread = this;
            super.startAndWaitUntilControlIsReturned();
        }

        @Override
        public void run() {
            try {
                JobFailureCollector jobFailureCollector = new JobFailureCollector(this.jobId);
                ExecuteJobHelper.executeJob((String)this.jobId, (CommandExecutor)ConcurrentJobExecutorTest.this.processEngineConfiguration.getCommandExecutorTxRequired(), (JobFailureCollector)jobFailureCollector, new ControlledCommand(activeThread, new ExecuteJobsCmd(this.jobId, jobFailureCollector)));
            }
            catch (OptimisticLockingException e) {
                this.exception = e;
            }
            LOG.debug(this.getName() + " ends");
        }
    }

    public class JobAcquisitionThread
    extends ControllableThread {
        OptimisticLockingException exception;
        AcquiredJobs acquiredJobs;

        @Override
        public synchronized void startAndWaitUntilControlIsReturned() {
            activeThread = this;
            super.startAndWaitUntilControlIsReturned();
        }

        @Override
        public void run() {
            try {
                JobExecutor jobExecutor = ConcurrentJobExecutorTest.this.processEngineConfiguration.getJobExecutor();
                this.acquiredJobs = (AcquiredJobs)ConcurrentJobExecutorTest.this.processEngineConfiguration.getCommandExecutorTxRequired().execute(new ControlledCommand(activeThread, new AcquireJobsCmd(jobExecutor)));
            }
            catch (OptimisticLockingException e) {
                this.exception = e;
            }
            LOG.debug(this.getName() + " ends");
        }
    }

    public class JobSuspensionThread
    extends ControllableThread {
        OptimisticLockingException exception;
        String processDefinitionKey;

        public JobSuspensionThread(String processDefinitionKey) {
            this.processDefinitionKey = processDefinitionKey;
        }

        @Override
        public synchronized void startAndWaitUntilControlIsReturned() {
            activeThread = this;
            super.startAndWaitUntilControlIsReturned();
        }

        @Override
        public void run() {
            try {
                ConcurrentJobExecutorTest.this.processEngineConfiguration.getCommandExecutorTxRequired().execute(new ControlledCommand<Void>(activeThread, this.createSuspendJobCommand()));
            }
            catch (OptimisticLockingException e) {
                this.exception = e;
            }
            LOG.debug(this.getName() + " ends");
        }

        protected Command<Void> createSuspendJobCommand() {
            UpdateJobDefinitionSuspensionStateBuilderImpl builder = new UpdateJobDefinitionSuspensionStateBuilderImpl().byProcessDefinitionKey(this.processDefinitionKey).includeJobs(true);
            return new SuspendJobDefinitionCmd(builder);
        }
    }

    public class JobDefinitionPriorityThread
    extends ControllableThread {
        OptimisticLockingException exception;
        String jobDefinitionId;
        Long priority;
        boolean cascade;

        public JobDefinitionPriorityThread(String jobDefinitionId, Long priority, boolean cascade) {
            this.jobDefinitionId = jobDefinitionId;
            this.priority = priority;
            this.cascade = cascade;
        }

        @Override
        public synchronized void startAndWaitUntilControlIsReturned() {
            activeThread = this;
            super.startAndWaitUntilControlIsReturned();
        }

        @Override
        public void run() {
            try {
                ConcurrentJobExecutorTest.this.processEngineConfiguration.getCommandExecutorTxRequired().execute(new ControlledCommand(activeThread, new SetJobDefinitionPriorityCmd(this.jobDefinitionId, this.priority, this.cascade)));
            }
            catch (OptimisticLockingException e) {
                this.exception = e;
            }
        }
    }

    public class JobSuspensionByJobDefinitionThread
    extends ControllableThread {
        OptimisticLockingException exception;
        String jobDefinitionId;

        public JobSuspensionByJobDefinitionThread(String jobDefinitionId) {
            this.jobDefinitionId = jobDefinitionId;
        }

        @Override
        public synchronized void startAndWaitUntilControlIsReturned() {
            activeThread = this;
            super.startAndWaitUntilControlIsReturned();
        }

        @Override
        public void run() {
            try {
                ConcurrentJobExecutorTest.this.processEngineConfiguration.getCommandExecutorTxRequired().execute(new ControlledCommand(activeThread, this.createSuspendJobCommand()));
            }
            catch (OptimisticLockingException e) {
                this.exception = e;
            }
            LOG.debug(this.getName() + " ends");
        }

        protected SuspendJobCmd createSuspendJobCommand() {
            UpdateJobSuspensionStateBuilderImpl builder = new UpdateJobSuspensionStateBuilderImpl().byJobDefinitionId(this.jobDefinitionId);
            return new SuspendJobCmd(builder);
        }
    }
}

