/*
 * Decompiled with CFR 0.152.
 */
package org.mydotey.artemis.taskdispatcher;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.mydotey.artemis.config.ArtemisConfig;
import org.mydotey.artemis.metric.ArtemisMetricManagers;
import org.mydotey.artemis.taskdispatcher.ProcessingResult;
import org.mydotey.artemis.taskdispatcher.Task;
import org.mydotey.artemis.taskdispatcher.TaskAcceptor;
import org.mydotey.artemis.taskdispatcher.TaskErrorCode;
import org.mydotey.artemis.taskdispatcher.TaskProcessor;
import org.mydotey.artemis.trace.ArtemisTraceExecutor;
import org.mydotey.artemis.util.Loops;
import org.mydotey.caravan.util.metric.EventMetric;
import org.mydotey.caravan.util.metric.MetricConfig;
import org.mydotey.java.ObjectExtension;
import org.mydotey.java.collection.CollectionExtension;
import org.mydotey.scf.Property;
import org.mydotey.scf.filter.RangeValueFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskExecutor<T extends Task, W> {
    private static final String IDENTITY_FORMAT = ".task-executor";
    private static final Logger _logger = LoggerFactory.getLogger(TaskExecutor.class);
    private String _executorId;
    private Property<String, Integer> _threadCountProperty;
    private List<Thread> _workerThreadExtension = new ArrayList<Thread>();
    protected AtomicBoolean _isShutdown = new AtomicBoolean();
    private TaskAcceptor<T, W> _taskAcceptor;
    private TaskProcessor<T, W> _taskProcessor;
    private String _executeTraceKey;
    private String _executeFailedTaskEventType;
    private EventMetric _executeFailedTaskEventMetric;

    public TaskExecutor(String dispatcherId, TaskAcceptor<T, W> taskAcceptor, TaskProcessor<T, W> taskProcessor) {
        ObjectExtension.requireNonBlank((String)dispatcherId, (String)"dispatcherId");
        ObjectExtension.requireNonNull(taskAcceptor, (String)"acceptorExecutor");
        ObjectExtension.requireNonNull(taskProcessor, (String)"taskProcessor");
        this._executorId = dispatcherId + IDENTITY_FORMAT;
        this._taskAcceptor = taskAcceptor;
        this._taskProcessor = taskProcessor;
        this._executeTraceKey = this._executorId + ".execute";
        this._executeFailedTaskEventType = this._executeTraceKey + ".failed-task";
        this._executeFailedTaskEventMetric = ArtemisMetricManagers.DEFAULT.eventMetricManager().getMetric(this._executeFailedTaskEventType, new MetricConfig((Map)ImmutableMap.of((Object)"distribution", (Object)this._executeFailedTaskEventType)));
        this._threadCountProperty = ArtemisConfig.properties().getIntProperty((Object)(this._executorId + ".thread-count"), Integer.valueOf(20), (Function)new RangeValueFilter((Comparable)Integer.valueOf(1), (Comparable)Integer.valueOf(100)));
        Runnable executiontask = new Runnable(){

            @Override
            public void run() {
                TaskExecutor.this.loopExecute();
            }
        };
        for (int i = 0; i < (Integer)this._threadCountProperty.getValue(); ++i) {
            Thread workerThread = new Thread(executiontask);
            this._workerThreadExtension.add(workerThread);
            workerThread.setDaemon(true);
            workerThread.start();
        }
    }

    public void shutdown() {
        if (!this._isShutdown.compareAndSet(false, true)) {
            return;
        }
        for (Thread workerThread : this._workerThreadExtension) {
            workerThread.interrupt();
        }
    }

    private void loopExecute() {
        while (!this._isShutdown.get()) {
            Loops.executeWithoutTightLoop(() -> {
                try {
                    W work = this._taskAcceptor.pollWork();
                    ArtemisTraceExecutor.INSTANCE.execute(this._executeTraceKey, () -> this.execute(work));
                }
                catch (Throwable ex) {
                    _logger.error("Execute task error.", ex);
                }
            });
        }
    }

    private void execute(W work) {
        ProcessingResult<T> result = this._taskProcessor.process(work);
        if (CollectionExtension.isEmpty(result.failedTasks())) {
            return;
        }
        for (Task task : result.failedTasks()) {
            String taskErrorCode = task.errorCode().toString();
            ArtemisTraceExecutor.INSTANCE.markEvent(this._executeFailedTaskEventType, taskErrorCode);
            this._executeFailedTaskEventMetric.addEvent(taskErrorCode);
            if (TaskErrorCode.RERUNNABLE_ERROR_CODES.contains((Object)task.errorCode())) {
                this._taskAcceptor.reaccept(task);
                return;
            }
            _logger.warn("Discarding a task due to non-rerunnable error. Task ErrorCode: {}", (Object)task.errorCode());
        }
    }
}

