/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.client.automator;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.automator.PollingSemaphore;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.telemetry.MetricsContainer;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.patterns.ThreadPoolMonitor;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskPollExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskPollExecutor.class);
    private static final Registry REGISTRY = Spectator.globalRegistry();
    private final EurekaClient eurekaClient;
    private final TaskClient taskClient;
    private final int updateRetryCount;
    private final ExecutorService executorService;
    private final Map<String, PollingSemaphore> pollingSemaphoreMap;
    private final Map<String, String> taskToDomain;
    private static final String DOMAIN = "domain";
    private static final String OVERRIDE_DISCOVERY = "pollOutOfDiscovery";
    private static final String ALL_WORKERS = "all";
    private static final int LEASE_EXTEND_RETRY_COUNT = 3;
    private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8;
    private ScheduledExecutorService leaseExtendExecutorService;
    Map<String, ScheduledFuture<?>> leaseExtendMap = new HashMap();
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, error) -> {
        MetricsContainer.incrementUncaughtExceptionCount();
        LOGGER.error("Uncaught exception. Thread {} will exit now", (Object)thread, (Object)error);
    };

    TaskPollExecutor(EurekaClient eurekaClient, TaskClient taskClient, int updateRetryCount, Map<String, String> taskToDomain, String workerNamePrefix, Map<String, Integer> taskThreadCount) {
        this.eurekaClient = eurekaClient;
        this.taskClient = taskClient;
        this.updateRetryCount = updateRetryCount;
        this.taskToDomain = taskToDomain;
        this.pollingSemaphoreMap = new HashMap<String, PollingSemaphore>();
        int totalThreadCount = 0;
        for (Map.Entry<String, Integer> entry : taskThreadCount.entrySet()) {
            String taskType = entry.getKey();
            int count = entry.getValue();
            totalThreadCount += count;
            this.pollingSemaphoreMap.put(taskType, new PollingSemaphore(count));
        }
        LOGGER.info("Initialized the TaskPollExecutor with {} threads", (Object)totalThreadCount);
        this.executorService = Executors.newFixedThreadPool(totalThreadCount, (ThreadFactory)new BasicThreadFactory.Builder().namingPattern(workerNamePrefix).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
        ThreadPoolMonitor.attach((Registry)REGISTRY, (ThreadPoolExecutor)((ThreadPoolExecutor)this.executorService), (String)workerNamePrefix);
        LOGGER.info("Initialized the task lease extend executor");
        this.leaseExtendExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new BasicThreadFactory.Builder().namingPattern("workflow-lease-extend-%d").daemon(true).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
    }

    void pollAndExecute(Worker worker) {
        Boolean discoveryOverride = Optional.ofNullable(PropertyFactory.getBoolean(worker.getTaskDefName(), OVERRIDE_DISCOVERY, null)).orElseGet(() -> PropertyFactory.getBoolean(ALL_WORKERS, OVERRIDE_DISCOVERY, false));
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals((Object)InstanceInfo.InstanceStatus.UP) && !discoveryOverride.booleanValue()) {
            LOGGER.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            MetricsContainer.incrementTaskPausedCount(worker.getTaskDefName());
            LOGGER.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        String taskType = worker.getTaskDefName();
        PollingSemaphore pollingSemaphore = this.getPollingSemaphore(taskType);
        int slotsToAcquire = pollingSemaphore.availableSlots();
        if (slotsToAcquire <= 0 || !pollingSemaphore.acquireSlots(slotsToAcquire)) {
            return;
        }
        int acquiredTasks = 0;
        try {
            String domain = Optional.ofNullable(PropertyFactory.getString(taskType, DOMAIN, null)).orElseGet(() -> Optional.ofNullable(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null)).orElse(this.taskToDomain.get(taskType)));
            LOGGER.debug("Polling task of type: {} in domain: '{}'", (Object)taskType, (Object)domain);
            List tasks = (List)MetricsContainer.getPollTimer(taskType).record(() -> this.taskClient.batchPollTasksInDomain(taskType, domain, worker.getIdentity(), slotsToAcquire, worker.getBatchPollTimeoutInMS()));
            acquiredTasks = tasks.size();
            for (Task task : tasks) {
                if (Objects.nonNull(task) && StringUtils.isNotBlank((CharSequence)task.getTaskId())) {
                    MetricsContainer.incrementTaskPollCount(taskType, 1);
                    LOGGER.debug("Polled task: {} of type: {} in domain: '{}', from worker: {}", new Object[]{task.getTaskId(), taskType, domain, worker.getIdentity()});
                    CompletableFuture<Task> taskCompletableFuture = CompletableFuture.supplyAsync(() -> this.processTask(task, worker, pollingSemaphore), this.executorService);
                    if (task.getResponseTimeoutSeconds() > 0L && worker.leaseExtendEnabled()) {
                        ScheduledFuture<?> leaseExtendFuture = this.leaseExtendExecutorService.scheduleWithFixedDelay(this.extendLease(task, taskCompletableFuture), Math.round((double)task.getResponseTimeoutSeconds() * 0.8), Math.round((double)task.getResponseTimeoutSeconds() * 0.8), TimeUnit.SECONDS);
                        this.leaseExtendMap.put(task.getTaskId(), leaseExtendFuture);
                    }
                    taskCompletableFuture.whenComplete(this::finalizeTask);
                    continue;
                }
                pollingSemaphore.complete(1);
            }
        }
        catch (Exception e) {
            MetricsContainer.incrementTaskPollErrorCount(worker.getTaskDefName(), e);
            LOGGER.error("Error when polling for tasks", (Throwable)e);
        }
        pollingSemaphore.complete(slotsToAcquire - acquiredTasks);
    }

    void shutdown(int timeout) {
        this.shutdownAndAwaitTermination(this.executorService, timeout);
        this.shutdownAndAwaitTermination(this.leaseExtendExecutorService, timeout);
        this.leaseExtendMap.clear();
    }

    void shutdownAndAwaitTermination(ExecutorService executorService, int timeout) {
        try {
            executorService.shutdown();
            if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", timeout));
                executorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemaphore) {
        LOGGER.debug("Executing task: {} of type: {} in worker: {} at {}", new Object[]{task.getTaskId(), task.getTaskDefName(), worker.getClass().getSimpleName(), worker.getIdentity()});
        try {
            this.executeTask(worker, task);
        }
        catch (Throwable t) {
            task.setStatus(Task.Status.FAILED);
            TaskResult result = new TaskResult(task);
            this.handleException(t, result, worker, task);
        }
        finally {
            pollingSemaphore.complete(1);
        }
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTask(Worker worker, Task task) {
        StopWatch stopwatch = new StopWatch();
        stopwatch.start();
        TaskResult result = null;
        try {
            LOGGER.debug("Executing task: {} in worker: {} at {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity()});
            result = worker.execute(task);
            result.setWorkflowInstanceId(task.getWorkflowInstanceId());
            result.setTaskId(task.getTaskId());
            result.setWorkerId(worker.getIdentity());
        }
        catch (Exception e) {
            LOGGER.error("Unable to execute task: {} of type: {}", new Object[]{task.getTaskId(), task.getTaskDefName(), e});
            if (result == null) {
                task.setStatus(Task.Status.FAILED);
                result = new TaskResult(task);
            }
            this.handleException(e, result, worker, task);
        }
        finally {
            stopwatch.stop();
            MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(stopwatch.getTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        }
        LOGGER.debug("Task: {} executed by worker: {} at {} with status: {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), result.getStatus()});
        this.updateTaskResult(this.updateRetryCount, task, result, worker);
    }

    private void finalizeTask(Task task, Throwable throwable) {
        if (throwable != null) {
            LOGGER.error("Error processing task: {} of type: {}", new Object[]{task.getTaskId(), task.getTaskType(), throwable});
            MetricsContainer.incrementTaskExecutionErrorCount(task.getTaskType(), throwable);
        } else {
            LOGGER.debug("Task:{} of type:{} finished processing with status:{}", new Object[]{task.getTaskId(), task.getTaskDefName(), task.getStatus()});
            String taskId = task.getTaskId();
            ScheduledFuture<?> leaseExtendFuture = this.leaseExtendMap.get(taskId);
            if (leaseExtendFuture != null) {
                leaseExtendFuture.cancel(true);
                this.leaseExtendMap.remove(taskId);
            }
        }
    }

    private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) {
        try {
            Optional optionalExternalStorageLocation = this.retryOperation(taskResult -> this.upload((TaskResult)taskResult, task.getTaskType()), count, result, "evaluateAndUploadLargePayload");
            if (optionalExternalStorageLocation.isPresent()) {
                result.setExternalOutputPayloadStoragePath((String)optionalExternalStorageLocation.get());
                result.setOutputData(null);
            }
            this.retryOperation(taskResult -> {
                this.taskClient.updateTask((TaskResult)taskResult);
                return null;
            }, count, result, "updateTask");
        }
        catch (Exception e) {
            worker.onErrorUpdate(task);
            MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            LOGGER.error(String.format("Failed to update result: %s for task: %s in worker: %s", result.toString(), task.getTaskDefName(), worker.getIdentity()), (Throwable)e);
        }
    }

    private Optional<String> upload(TaskResult result, String taskType) {
        try {
            return this.taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
        }
        catch (IllegalArgumentException iae) {
            result.setReasonForIncompletion(iae.getMessage());
            result.setOutputData(null);
            result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
            return Optional.empty();
        }
    }

    private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
        int index = 0;
        while (index < count) {
            try {
                return operation.apply(input);
            }
            catch (Exception e) {
                ++index;
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    LOGGER.error("Retry interrupted", (Throwable)ie);
                }
            }
        }
        throw new RuntimeException("Exhausted retries performing " + opName);
    }

    private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
        LOGGER.error(String.format("Error while executing task %s", task.toString()), t);
        MetricsContainer.incrementTaskExecutionErrorCount(worker.getTaskDefName(), t);
        result.setStatus(TaskResult.Status.FAILED);
        result.setReasonForIncompletion("Error while executing the task: " + String.valueOf(t));
        StringWriter stringWriter = new StringWriter();
        t.printStackTrace(new PrintWriter(stringWriter));
        result.log(stringWriter.toString());
        this.updateTaskResult(this.updateRetryCount, task, result, worker);
    }

    private PollingSemaphore getPollingSemaphore(String taskType) {
        return this.pollingSemaphoreMap.get(taskType);
    }

    private Runnable extendLease(Task task, CompletableFuture<Task> taskCompletableFuture) {
        return () -> {
            if (taskCompletableFuture.isDone()) {
                LOGGER.warn("Task processing for {} completed, but its lease extend was not cancelled", (Object)task.getTaskId());
                return;
            }
            LOGGER.info("Attempting to extend lease for {}", (Object)task.getTaskId());
            try {
                TaskResult result = new TaskResult(task);
                result.setExtendLease(true);
                this.retryOperation(taskResult -> {
                    this.taskClient.updateTask((TaskResult)taskResult);
                    return null;
                }, 3, result, "extend lease");
                MetricsContainer.incrementTaskLeaseExtendCount(task.getTaskDefName(), 1);
            }
            catch (Exception e) {
                MetricsContainer.incrementTaskLeaseExtendErrorCount(task.getTaskDefName(), e);
                LOGGER.error("Failed to extend lease for {}", (Object)task.getTaskId(), (Object)e);
            }
        };
    }
}

