/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.client.automator;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
    private final TaskClient taskClient;
    private final int updateRetryCount;
    private final ExecutorService executorService;
    private final int taskPollTimeout;
    private final Semaphore permits;
    private final Worker worker;
    private final int pollingIntervalInMillis;
    private final String taskType;
    private final int errorAt;
    private int pollingErrorCount;
    private String domain;
    private volatile boolean pollingAndExecuting = true;
    private final List<PollFilter> pollFilters;
    private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
    private final LinkedBlockingQueue<Task> tasksTobeExecuted;
    private final boolean enableUpdateV2;
    private static final int LEASE_EXTEND_RETRY_COUNT = 3;
    private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8;
    private final ScheduledExecutorService leaseExtendExecutorService;
    private Map<String, ScheduledFuture<?>> leaseExtendMap = new HashMap();
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread2, error) -> LOGGER.error("Uncaught exception. Thread {} will exit now", (Object)thread2, (Object)error);

    TaskRunner(Worker worker, TaskClient taskClient, int updateRetryCount, Map<String, String> taskToDomain, String workerNamePrefix, int threadCount, int taskPollTimeout, List<PollFilter> pollFilters, EventDispatcher<TaskRunnerEvent> eventDispatcher) {
        this.worker = worker;
        this.taskClient = taskClient;
        this.updateRetryCount = updateRetryCount;
        this.taskPollTimeout = taskPollTimeout;
        this.pollingIntervalInMillis = worker.getPollingInterval();
        this.taskType = worker.getTaskDefName();
        this.permits = new Semaphore(threadCount);
        this.pollFilters = pollFilters;
        this.eventDispatcher = eventDispatcher;
        this.tasksTobeExecuted = new LinkedBlockingQueue();
        this.enableUpdateV2 = Boolean.valueOf(System.getProperty("taskUpdateV2", "false"));
        LOGGER.info("taskUpdateV2 is set to {}", (Object)this.enableUpdateV2);
        this.domain = PropertyFactory.getString(this.taskType, "domain", null);
        if (this.domain == null) {
            this.domain = PropertyFactory.getString("all", "domain", null);
        }
        if (this.domain == null) {
            this.domain = taskToDomain.get(this.taskType);
        }
        int defaultLoggingInterval = 100;
        int errorInterval = PropertyFactory.getInteger(this.taskType, "log_interval", 0);
        if (errorInterval == 0) {
            errorInterval = PropertyFactory.getInteger("all", "log_interval", 0);
        }
        if (errorInterval == 0) {
            errorInterval = defaultLoggingInterval;
        }
        this.errorAt = errorInterval;
        LOGGER.info("Polling errors will be sampled at every {} error (after the first 100 errors) for taskType {}", (Object)this.errorAt, (Object)this.taskType);
        this.executorService = Executors.newFixedThreadPool(threadCount, new BasicThreadFactory.Builder().namingPattern(workerNamePrefix).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
        LOGGER.info("Starting Worker for taskType '{}' with {} threads, {} ms polling interval and domain {}", this.taskType, threadCount, this.pollingIntervalInMillis, this.domain);
        LOGGER.info("Polling errors for taskType {} will be printed at every {} occurrence.", (Object)this.taskType, (Object)this.errorAt);
        LOGGER.info("Initialized the task lease extend executor");
        this.leaseExtendExecutorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("workflow-lease-extend-%d").daemon(true).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
    }

    public void pollAndExecute() {
        Stopwatch stopwatch = null;
        while (this.pollingAndExecuting && !Thread.currentThread().isInterrupted()) {
            try {
                List<Task> tasks = this.pollTasksForWorker();
                if (tasks.isEmpty()) {
                    if (stopwatch == null) {
                        stopwatch = Stopwatch.createStarted();
                    }
                    Uninterruptibles.sleepUninterruptibly(this.pollingIntervalInMillis, TimeUnit.MILLISECONDS);
                    continue;
                }
                if (stopwatch != null) {
                    stopwatch.stop();
                    LOGGER.trace("Poller for task {} waited for {} ms before getting {} tasks to execute", this.taskType, stopwatch.elapsed(TimeUnit.MILLISECONDS), tasks.size());
                    stopwatch = null;
                }
                tasks.forEach(task -> {
                    Future<Task> taskFuture = this.executorService.submit(() -> this.processTask((Task)task));
                    if (task.getResponseTimeoutSeconds() > 0L && this.worker.leaseExtendEnabled()) {
                        ScheduledFuture<?> scheduledFuture = this.leaseExtendMap.get(task.getTaskId());
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(false);
                        }
                        long delay = Math.round((double)task.getResponseTimeoutSeconds() * 0.8);
                        ScheduledFuture<?> leaseExtendFuture = this.leaseExtendExecutorService.scheduleWithFixedDelay(this.extendLease((Task)task, taskFuture), delay, delay, TimeUnit.SECONDS);
                        this.leaseExtendMap.put(task.getTaskId(), leaseExtendFuture);
                    }
                });
            }
            catch (Throwable t2) {
                LOGGER.error(t2.getMessage(), t2);
            }
        }
    }

    public void shutdown(int timeout2) {
        try {
            this.pollingAndExecuting = false;
            this.executorService.shutdown();
            if (this.executorService.awaitTermination(timeout2, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", timeout2));
                this.executorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private List<Task> pollTasksForWorker() {
        this.eventDispatcher.publish(new PollStarted(this.taskType));
        if (this.worker.paused()) {
            LOGGER.trace("Worker {} has been paused. Not polling anymore!", (Object)this.worker.getClass());
            return List.of();
        }
        for (PollFilter filter : this.pollFilters) {
            if (filter.filter(this.taskType, this.domain)) continue;
            LOGGER.trace("Filter returned false, not polling.");
            return List.of();
        }
        int pollCount = 0;
        while (this.permits.tryAcquire()) {
            ++pollCount;
        }
        if (pollCount == 0) {
            return List.of();
        }
        List<Object> tasks = new LinkedList();
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            LOGGER.trace("Polling task of type: {} in domain: '{}' with size {}", this.taskType, this.domain, pollCount);
            tasks = this.pollTask(pollCount);
            this.permits.release(pollCount - tasks.size());
            stopwatch.stop();
            long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", this.taskType, tasks.size(), elapsed);
            this.eventDispatcher.publish(new PollCompleted(this.taskType, elapsed));
        }
        catch (Throwable e) {
            this.permits.release(pollCount - tasks.size());
            boolean printError = this.pollingErrorCount < 100 || this.pollingErrorCount % this.errorAt == 0;
            ++this.pollingErrorCount;
            if (this.pollingErrorCount > 10000000) {
                this.pollingErrorCount = 0;
            }
            if (printError) {
                LOGGER.error("Error polling for taskType: {}, error = {}", this.taskType, e.getMessage(), e);
            }
            if (stopwatch.isRunning()) {
                stopwatch.stop();
            }
            long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            this.eventDispatcher.publish(new PollFailure(this.taskType, elapsed, e));
        }
        return tasks;
    }

    private List<Task> pollTask(int count) {
        if (count < 1) {
            return Collections.emptyList();
        }
        LOGGER.trace("in memory queue size for tasks: {}", (Object)this.tasksTobeExecuted.size());
        ArrayList<Task> polled = new ArrayList<Task>(count);
        this.tasksTobeExecuted.drainTo(polled, count);
        if (!polled.isEmpty()) {
            return polled;
        }
        String workerId = this.worker.getIdentity();
        LOGGER.debug("poll {} in the domain {} with batch size {}", this.taskType, this.domain, count);
        return this.taskClient.batchPollTasksInDomain(this.taskType, this.domain, workerId, count, this.taskPollTimeout);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Task processTask(Task task) {
        this.eventDispatcher.publish(new TaskExecutionStarted(this.taskType, task.getTaskId(), this.worker.getIdentity()));
        LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), this.taskType, this.worker.getClass().getSimpleName(), this.worker.getIdentity());
        LOGGER.trace("task {} is getting executed after {} ms of getting polled", (Object)task.getTaskId(), (Object)(System.currentTimeMillis() - task.getStartTime()));
        Stopwatch stopwatch = Stopwatch.createStarted();
        try {
            this.executeTask(this.worker, task);
            stopwatch.stop();
            long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            LOGGER.trace("Took {} ms to execute and update task with id {}", (Object)elapsed, (Object)task.getTaskId());
        }
        catch (Throwable t2) {
            task.setStatus(Task.Status.FAILED);
            TaskResult result2 = new TaskResult(task);
            this.handleException(t2, result2, this.worker, task);
        }
        finally {
            this.permits.release();
        }
        return task;
    }

    private void executeTask(Worker worker, Task task) {
        if (task == null || task.getTaskDefName().isEmpty()) {
            LOGGER.warn("Empty task {}", (Object)worker.getTaskDefName());
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        TaskResult result2 = null;
        try {
            LOGGER.trace("Executing task: {} in worker: {} at {}", task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity());
            result2 = worker.execute(task);
            stopwatch.stop();
            this.eventDispatcher.publish(new TaskExecutionCompleted(this.taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
            result2.setWorkflowInstanceId(task.getWorkflowInstanceId());
            result2.setTaskId(task.getTaskId());
            result2.setWorkerId(worker.getIdentity());
        }
        catch (Exception e) {
            if (stopwatch.isRunning()) {
                stopwatch.stop();
            }
            this.eventDispatcher.publish(new TaskExecutionFailure(this.taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
            LOGGER.error("Unable to execute task: {} of type: {}", task.getTaskId(), task.getTaskDefName(), e);
            if (result2 == null) {
                task.setStatus(Task.Status.FAILED);
                result2 = new TaskResult(task);
            }
            this.handleException(e, result2, worker, task);
        }
        LOGGER.trace("Task: {} executed by worker: {} at {} with status: {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), result2.getStatus()});
        Stopwatch updateStopWatch = Stopwatch.createStarted();
        this.updateTaskResult(this.updateRetryCount, task, result2, worker);
        updateStopWatch.stop();
        LOGGER.trace("Time taken to update the {} {} ms", (Object)task.getTaskType(), (Object)updateStopWatch.elapsed(TimeUnit.MILLISECONDS));
    }

    private void updateTaskResult(int count, Task task, TaskResult result2, Worker worker) {
        try {
            Optional optionalExternalStorageLocation = this.retryOperation(taskResult -> this.upload((TaskResult)taskResult, task.getTaskType()), count, result2, "evaluateAndUploadLargePayload");
            if (optionalExternalStorageLocation.isPresent()) {
                result2.setExternalOutputPayloadStoragePath((String)optionalExternalStorageLocation.get());
                result2.setOutputData(null);
            }
            if (this.enableUpdateV2) {
                Task nextTask = this.retryOperation(this.taskClient::updateTaskV2, count, result2, "updateTaskV2");
                if (nextTask != null) {
                    this.tasksTobeExecuted.add(nextTask);
                }
            } else {
                this.retryOperation(taskResult -> {
                    this.taskClient.updateTask((TaskResult)taskResult);
                    return null;
                }, count, result2, "updateTask");
            }
        }
        catch (Exception e) {
            worker.onErrorUpdate(task);
            LOGGER.error(String.format("Failed to update result: %s for task: %s in worker: %s", result2.toString(), task.getTaskDefName(), worker.getIdentity()), e);
        }
    }

    private Optional<String> upload(TaskResult result2, String taskType) {
        try {
            return this.taskClient.evaluateAndUploadLargePayload(result2.getOutputData(), taskType);
        }
        catch (IllegalArgumentException iae) {
            result2.setReasonForIncompletion(iae.getMessage());
            result2.setOutputData(null);
            result2.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
            return Optional.empty();
        }
    }

    private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
        for (int index = 0; index < count; ++index) {
            try {
                return operation.apply(input);
            }
            catch (Exception e) {
                LOGGER.error("Error executing {}", (Object)opName, (Object)e);
                Uninterruptibles.sleepUninterruptibly(500L * (long)(count + 1), TimeUnit.MILLISECONDS);
                continue;
            }
        }
        throw new RuntimeException("Exhausted retries performing " + opName);
    }

    private void handleException(Throwable t2, TaskResult result2, Worker worker, Task task) {
        LOGGER.error(String.format("Error while executing task %s", task.toString()), t2);
        result2.setStatus(TaskResult.Status.FAILED);
        result2.setReasonForIncompletion("Error while executing the task: " + t2);
        StringWriter stringWriter = new StringWriter();
        t2.printStackTrace(new PrintWriter(stringWriter));
        result2.log(stringWriter.toString());
        this.updateTaskResult(this.updateRetryCount, task, result2, worker);
    }

    private Runnable extendLease(Task task, Future<Task> taskCompletableFuture) {
        return () -> {
            if (taskCompletableFuture.isDone()) {
                LOGGER.warn("Task processing for {} completed, but its lease extend was not cancelled", (Object)task.getTaskId());
                return;
            }
            LOGGER.info("Attempting to extend lease for {}", (Object)task.getTaskId());
            try {
                TaskResult result2 = new TaskResult(task);
                result2.setExtendLease(true);
                this.retryOperation(taskResult -> {
                    this.taskClient.updateTask((TaskResult)taskResult);
                    return null;
                }, 3, result2, "extend lease");
            }
            catch (Exception e) {
                LOGGER.error("Failed to extend lease for {}", (Object)task.getTaskId(), (Object)e);
            }
        };
    }
}

