/*
 * Decompiled with CFR 0.152.
 */
package org.camunda.bpm.engine.test.cockroachdb;

import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.camunda.bpm.engine.CrdbTransactionRetryException;
import org.camunda.bpm.engine.ManagementService;
import org.camunda.bpm.engine.ProcessEngineBootstrapCommand;
import org.camunda.bpm.engine.ProcessEngineConfiguration;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.externaltask.LockedExternalTask;
import org.camunda.bpm.engine.impl.BootstrapEngineCommand;
import org.camunda.bpm.engine.impl.ProcessEngineImpl;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.cmd.FetchExternalTasksCmd;
import org.camunda.bpm.engine.impl.cmd.SetJobDefinitionPriorityCmd;
import org.camunda.bpm.engine.impl.cmd.SuspendJobDefinitionCmd;
import org.camunda.bpm.engine.impl.externaltask.TopicFetchInstruction;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.jobexecutor.JobExecutor;
import org.camunda.bpm.engine.impl.management.UpdateJobDefinitionSuspensionStateBuilderImpl;
import org.camunda.bpm.engine.impl.test.RequiredDatabase;
import org.camunda.bpm.engine.management.JobDefinition;
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.concurrency.ConcurrencyTestHelper;
import org.camunda.bpm.engine.test.jobexecutor.ControllableJobExecutor;
import org.camunda.bpm.engine.test.jobexecutor.RecordingAcquireJobsRunnable;
import org.camunda.bpm.engine.test.util.ProcessEngineBootstrapRule;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

@RequiredDatabase(includes={"cockroachdb"})
public class CockroachDBRetriesTest
extends ConcurrencyTestHelper {
    protected static final int DEFAULT_NUM_JOBS_TO_ACQUIRE = 3;
    protected static final int COMMAND_RETRIES = 3;
    protected static final String PROCESS_ENGINE_NAME = "failingProcessEngine";
    @ClassRule
    public static ProcessEngineBootstrapRule bootstrapRule = new ProcessEngineBootstrapRule(c -> c.setCommandRetries(3));
    protected ProvidedProcessEngineRule engineRule = new ProvidedProcessEngineRule(bootstrapRule);
    protected ProcessEngineTestRule testRule = new ProcessEngineTestRule(this.engineRule);
    @Rule
    public RuleChain ruleChain = RuleChain.outerRule((TestRule)this.engineRule).around((TestRule)this.testRule);
    protected ControllableJobExecutor jobExecutor1;
    protected ControllableJobExecutor jobExecutor2;
    protected ConcurrencyTestHelper.ThreadControl acquisitionThread1;
    protected ConcurrencyTestHelper.ThreadControl acquisitionThread2;
    protected RuntimeService runtimeService;
    protected ManagementService managementService;

    @Before
    public void setUp() throws Exception {
        this.processEngineConfiguration = this.engineRule.getProcessEngineConfiguration();
        this.runtimeService = this.engineRule.getRuntimeService();
        this.managementService = this.engineRule.getManagementService();
        this.jobExecutor1 = new ControllableJobExecutor((ProcessEngineImpl)this.engineRule.getProcessEngine());
        this.jobExecutor1.setMaxJobsPerAcquisition(3);
        this.acquisitionThread1 = this.jobExecutor1.getAcquisitionThreadControl();
        this.processEngineConfiguration.setJobExecutor((JobExecutor)this.jobExecutor1);
        this.jobExecutor2 = new ControllableJobExecutor((ProcessEngineImpl)this.engineRule.getProcessEngine());
        this.jobExecutor2.setMaxJobsPerAcquisition(3);
        this.acquisitionThread2 = this.jobExecutor2.getAcquisitionThreadControl();
    }

    @After
    public void tearDown() throws Exception {
        this.jobExecutor1.shutdown();
        this.jobExecutor2.shutdown();
    }

    @Test
    @Deployment(resources={"org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml"})
    public void shouldRetryJobAcquisition() {
        int numJobs = 4;
        for (int i = 0; i < numJobs; ++i) {
            this.engineRule.getRuntimeService().startProcessInstanceByKey("simpleAsyncProcess").getId();
        }
        this.jobExecutor1.start();
        this.jobExecutor2.start();
        this.acquisitionThread1.waitForSync();
        this.acquisitionThread2.waitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread2.makeContinueAndWaitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread2.makeContinueAndWaitForSync();
        this.acquisitionThread2.makeContinueAndWaitForSync();
        this.acquisitionThread2.makeContinueAndWaitForSync();
        long currentJobs = this.engineRule.getManagementService().createJobQuery().active().count();
        Assertions.assertThat((long)currentJobs).isEqualTo(0L);
        Assertions.assertThat((Throwable)this.acquisitionThread2.getException()).isNull();
        List<RecordingAcquireJobsRunnable.RecordedWaitEvent> jobAcquisition2WaitEvents = this.jobExecutor2.getAcquireJobsRunnable().getWaitEvents();
        Assertions.assertThat(jobAcquisition2WaitEvents).hasSize(1);
        Exception acquisitionException = jobAcquisition2WaitEvents.get(0).getAcquisitionException();
        Assertions.assertThat((Throwable)acquisitionException).isNull();
    }

    @Test
    @Deployment(resources={"org/camunda/bpm/engine/test/concurrency/CompetingExternalTaskFetchingTest.testCompetingExternalTaskFetching.bpmn20.xml"})
    public void shouldRetryExternalTaskFetchAndLock() {
        RuntimeService runtimeService = this.engineRule.getRuntimeService();
        int numTasksToFetch = 3;
        int numExternalTasks = numTasksToFetch + 1;
        for (int i = 0; i < numExternalTasks; ++i) {
            runtimeService.startProcessInstanceByKey("oneExternalTaskProcess");
        }
        ConcurrencyTestHelper.ThreadControl thread1 = this.executeControllableCommand(new ControlledFetchAndLockCommand(numTasksToFetch, "thread1", "externalTaskTopic"));
        ConcurrencyTestHelper.ThreadControl thread2 = this.executeControllableCommand(new ControlledFetchAndLockCommand(numTasksToFetch, "thread2", "externalTaskTopic"));
        thread1.waitForSync();
        thread2.waitForSync();
        thread1.makeContinueAndWaitForSync();
        thread2.makeContinueAndWaitForSync();
        thread1.waitUntilDone();
        thread2.waitUntilDone(true);
        List tasks = this.engineRule.getExternalTaskService().createExternalTaskQuery().list();
        List thread1Tasks = tasks.stream().filter(t -> "thread1".equals(t.getWorkerId())).collect(Collectors.toList());
        List thread2Tasks = tasks.stream().filter(t -> "thread2".equals(t.getWorkerId())).collect(Collectors.toList());
        Assertions.assertThat((List)tasks).hasSize(numExternalTasks);
        Assertions.assertThat(thread1Tasks).hasSize(numTasksToFetch);
        Assertions.assertThat(thread2Tasks).hasSize(numExternalTasks - numTasksToFetch);
    }

    @Test
    @Deployment(resources={"org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml"})
    public void shouldRetryJobExecutionTxAfterJobPriorityOLE() {
        int numJobs = 4;
        for (int i = 0; i < numJobs; ++i) {
            this.engineRule.getRuntimeService().startProcessInstanceByKey("simpleAsyncProcess");
        }
        JobDefinition jobDefinition = (JobDefinition)this.managementService.createJobDefinitionQuery().singleResult();
        this.jobExecutor1.start();
        this.acquisitionThread1.waitForSync();
        ConcurrencyTestHelper.ThreadControl jobPriorityThread = this.executeControllableCommand(new ControllableJobDefinitionPriorityCommand(jobDefinition.getId(), 42L, true));
        jobPriorityThread.waitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        jobPriorityThread.makeContinue();
        jobPriorityThread.waitUntilDone(true);
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread1.ignoreFutureSyncs();
        Job currentJob = (Job)this.engineRule.getManagementService().createJobQuery().active().singleResult();
        Assertions.assertThat((Object)currentJob).isNotNull();
        Assert.assertEquals((long)42L, (long)currentJob.getPriority());
        Assertions.assertThat((Throwable)this.acquisitionThread1.getException()).isNull();
    }

    @Test
    @Deployment(resources={"org/camunda/bpm/engine/test/jobexecutor/simpleAsyncProcess.bpmn20.xml"})
    public void shouldRetryAcquisitionJobTxAfterJobSuspensionOLE() {
        String processDefinitionKey = "simpleAsyncProcess";
        this.runtimeService.startProcessInstanceByKey(processDefinitionKey);
        this.jobExecutor1.start();
        this.acquisitionThread1.reportInterrupts();
        this.acquisitionThread1.waitForSync();
        ConcurrencyTestHelper.ThreadControl jobSuspensionThread = this.executeControllableCommand(new ControllableJobSuspensionCommand(processDefinitionKey));
        jobSuspensionThread.reportInterrupts();
        jobSuspensionThread.waitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        jobSuspensionThread.makeContinue();
        jobSuspensionThread.waitUntilDone(true);
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        this.acquisitionThread1.makeContinueAndWaitForSync();
        Assert.assertNull((Object)jobSuspensionThread.getException());
        Assert.assertNull((Object)this.acquisitionThread1.getException());
        long jobCount = this.managementService.createJobQuery().suspended().count();
        Assertions.assertThat((long)jobCount).isOne();
    }

    @Test
    public void shouldRethrowBootstrapEngineOleWhenRetriesAreExhausted() {
        FailingProcessEngineBootstrapCommand bootstrapCommand = new FailingProcessEngineBootstrapCommand();
        ProcessEngineConfigurationImpl processEngineConfiguration = ((ProcessEngineConfigurationImpl)ProcessEngineConfiguration.createProcessEngineConfigurationFromResource((String)"camunda.cfg.xml")).setCommandRetries(3).setProcessEngineName(PROCESS_ENGINE_NAME);
        processEngineConfiguration.setProcessEngineBootstrapCommand((ProcessEngineBootstrapCommand)bootstrapCommand);
        Assertions.assertThatThrownBy(() -> processEngineConfiguration.buildProcessEngine()).isInstanceOf(CrdbTransactionRetryException.class);
        Assertions.assertThat((int)bootstrapCommand.getTries()).isEqualTo(4);
    }

    @Test
    public void shouldNotRetryCommandByDefault() {
        CrdbFailingCommand failingCommand = new CrdbFailingCommand();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            Void cfr_ignored_0 = (Void)this.processEngineConfiguration.getCommandExecutorTxRequired().execute((Command)failingCommand);
        }).isInstanceOf(CrdbTransactionRetryException.class)).hasMessageContaining("Does not retry");
        Assertions.assertThat((int)failingCommand.getTries()).isEqualTo(1);
    }

    protected class ControllableJobSuspensionCommand
    extends ConcurrencyTestHelper.ControllableCommand<Void> {
        protected SuspendJobDefinitionCmd suspendJobDefinitionCmd;

        public ControllableJobSuspensionCommand(String processDefinitionKey) {
            UpdateJobDefinitionSuspensionStateBuilderImpl builder = new UpdateJobDefinitionSuspensionStateBuilderImpl().byProcessDefinitionKey(processDefinitionKey).includeJobs(true);
            this.suspendJobDefinitionCmd = new SuspendJobDefinitionCmd(builder);
        }

        public Void execute(CommandContext commandContext) {
            this.monitor.sync();
            this.suspendJobDefinitionCmd.execute(commandContext);
            this.monitor.sync();
            return null;
        }
    }

    protected class ControllableJobDefinitionPriorityCommand
    extends ConcurrencyTestHelper.ControllableCommand<Void> {
        protected SetJobDefinitionPriorityCmd jobDefinitionPriorityCmd;

        public ControllableJobDefinitionPriorityCommand(String jobDefinitionId, Long priority, boolean cascade) {
            this.jobDefinitionPriorityCmd = new SetJobDefinitionPriorityCmd(jobDefinitionId, priority, cascade);
        }

        public Void execute(CommandContext commandContext) {
            this.monitor.sync();
            this.jobDefinitionPriorityCmd.execute(commandContext);
            this.monitor.sync();
            return null;
        }
    }

    protected static class CrdbFailingCommand
    implements Command<Void> {
        protected int tries = 0;

        protected CrdbFailingCommand() {
        }

        public Void execute(CommandContext commandContext) {
            ++this.tries;
            throw new CrdbTransactionRetryException("Does not retry");
        }

        public int getTries() {
            return this.tries;
        }
    }

    protected static class FailingProcessEngineBootstrapCommand
    extends BootstrapEngineCommand {
        protected int tries = 0;

        public Void execute(CommandContext commandContext) {
            ++this.tries;
            throw new CrdbTransactionRetryException("The Process Engine Bootstrap has failed.");
        }

        public int getTries() {
            return this.tries;
        }

        public boolean isRetryable() {
            return super.isRetryable();
        }
    }

    protected static class ControlledFetchAndLockCommand
    extends ConcurrencyTestHelper.ControllableCommand<List<LockedExternalTask>> {
        protected FetchExternalTasksCmd wrappedCmd;

        public ControlledFetchAndLockCommand(int numTasks, String workerId, String topic) {
            HashMap<String, TopicFetchInstruction> instructions = new HashMap<String, TopicFetchInstruction>();
            TopicFetchInstruction instruction = new TopicFetchInstruction(topic, 10000L);
            instructions.put(topic, instruction);
            this.wrappedCmd = new FetchExternalTasksCmd(workerId, numTasks, instructions);
        }

        public List<LockedExternalTask> execute(CommandContext commandContext) {
            this.monitor.sync();
            List tasks = this.wrappedCmd.execute(commandContext);
            this.monitor.sync();
            return tasks;
        }

        public boolean isRetryable() {
            return this.wrappedCmd.isRetryable();
        }
    }
}

