/*
 * Decompiled with CFR 0.152.
 */
package org.mydotey.artemis.taskdispatcher;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.mydotey.artemis.config.ArtemisConfig;
import org.mydotey.artemis.metric.ArtemisMetricManagers;
import org.mydotey.artemis.taskdispatcher.Task;
import org.mydotey.artemis.taskdispatcher.TrafficShaper;
import org.mydotey.artemis.trace.ArtemisTraceExecutor;
import org.mydotey.artemis.util.Loops;
import org.mydotey.caravan.util.metric.EventMetric;
import org.mydotey.caravan.util.metric.MetricConfig;
import org.mydotey.java.ObjectExtension;
import org.mydotey.java.ThreadExtension;
import org.mydotey.java.collection.MultiWriteBatchReadList;
import org.mydotey.scf.Property;
import org.mydotey.scf.filter.RangeValueFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class TaskAcceptor<T extends Task, W> {
    private static final String IDENTITY_FORMAT = ".task-acceptor";
    private static final Logger _logger = LoggerFactory.getLogger(TaskAcceptor.class);
    protected Property<String, Integer> _maxBufferSizeProperty;
    protected Property<String, Integer> _writeCompleteWaitProperty;
    protected Property<String, Integer> _acceptListInitCapacityProperty;
    protected Property<String, Integer> _reacceptListInitCapacityProperty;
    private AtomicBoolean _isShutdown = new AtomicBoolean(false);
    private Thread _acceptorThread;
    private volatile MultiWriteBatchReadList<T> _acceptList;
    private volatile MultiWriteBatchReadList<T> _reacceptList;
    protected Map<String, T> _acceptedTasks = new HashMap<String, T>();
    protected Deque<String> _processingOrder = new LinkedList<String>();
    protected BlockingQueue<W> _workQueue = new LinkedBlockingQueue<W>();
    protected AtomicInteger _pendingTaskCount = new AtomicInteger();
    private TrafficShaper _trafficShaper;
    protected final String _acceptorId;
    private String _assignWorkTraceKey;
    private String _drainAcceptTraceKey;
    protected String _workStatusEventType;
    protected EventMetric _workStatusEventMetric;
    protected String _pollWorkResultEventType;
    protected EventMetric _pollWorkResultEventMetric;
    protected String _taskStatusEventType;
    protected EventMetric _taskStatusEventMetric;
    private String _drainAcceptTaskTypeEventType;
    protected EventMetric _drainAcceptTaskTypeEventMetric;

    public TaskAcceptor(String dispatcherId) {
        ObjectExtension.requireNonBlank((String)dispatcherId, (String)"dispatcherId");
        this._acceptorId = dispatcherId + IDENTITY_FORMAT;
        this._assignWorkTraceKey = this._acceptorId + ".assign-work";
        this._drainAcceptTraceKey = this._acceptorId + ".drain-accept";
        this._workStatusEventType = this._acceptorId + ".work-status";
        this._workStatusEventMetric = ArtemisMetricManagers.DEFAULT.eventMetricManager().getMetric(this._workStatusEventType, new MetricConfig((Map)ImmutableMap.of((Object)"distribution", (Object)this._workStatusEventType)));
        this._pollWorkResultEventType = this._acceptorId + ".poll-work-result";
        this._pollWorkResultEventMetric = ArtemisMetricManagers.DEFAULT.eventMetricManager().getMetric(this._pollWorkResultEventType, new MetricConfig((Map)ImmutableMap.of((Object)"distribution", (Object)this._pollWorkResultEventType)));
        this._taskStatusEventType = this._acceptorId + ".task-status";
        this._taskStatusEventMetric = ArtemisMetricManagers.DEFAULT.eventMetricManager().getMetric(this._taskStatusEventType, new MetricConfig((Map)ImmutableMap.of((Object)"distribution", (Object)this._taskStatusEventType)));
        this._drainAcceptTaskTypeEventType = this._acceptorId + ".task-Type";
        this._drainAcceptTaskTypeEventMetric = ArtemisMetricManagers.DEFAULT.eventMetricManager().getMetric(this._drainAcceptTaskTypeEventType, new MetricConfig((Map)ImmutableMap.of((Object)"distribution", (Object)this._drainAcceptTaskTypeEventType)));
        this._maxBufferSizeProperty = ArtemisConfig.properties().getIntProperty((Object)(this._acceptorId + ".max-buffer-size"), Integer.valueOf(10000), (Function)new RangeValueFilter((Comparable)Integer.valueOf(100), (Comparable)Integer.valueOf(100000)));
        this._writeCompleteWaitProperty = ArtemisConfig.properties().getIntProperty((Object)(this._acceptorId + ".write-complete-wait"), Integer.valueOf(5), (Function)new RangeValueFilter((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(200)));
        this._acceptListInitCapacityProperty = ArtemisConfig.properties().getIntProperty((Object)(this._acceptorId + ".accept-list.init-capacity"), Integer.valueOf(10000), (Function)new RangeValueFilter((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(100000)));
        this._reacceptListInitCapacityProperty = ArtemisConfig.properties().getIntProperty((Object)(this._acceptorId + ".reaccept-list.init-capacity"), Integer.valueOf(1000), (Function)new RangeValueFilter((Comparable)Integer.valueOf(0), (Comparable)Integer.valueOf(100000)));
        this.resetAcceptList();
        this._trafficShaper = new TrafficShaper(dispatcherId);
        this._acceptorThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TaskAcceptor.this.loopAssign();
            }
        });
        this._acceptorThread.setDaemon(true);
        this._acceptorThread.start();
    }

    public void accept(T task) {
        this._acceptList.add(task);
    }

    public void reaccept(T task) {
        this._reacceptList.add(task);
        this._trafficShaper.markFail(task.errorCode());
    }

    public void shutdown() {
        if (this._isShutdown.compareAndSet(false, true)) {
            this._acceptorThread.interrupt();
        }
    }

    public W pollWork() {
        try {
            W work;
            while (true) {
                if (this._isShutdown.get()) {
                    throw new RuntimeException("TaskAcceptor has been shutdown.");
                }
                work = this._workQueue.poll(1L, TimeUnit.SECONDS);
                if (work == null) {
                    ArtemisTraceExecutor.INSTANCE.markEvent(this._pollWorkResultEventType, "no-work");
                    this._pollWorkResultEventMetric.addEvent("no-work");
                    continue;
                }
                this._pendingTaskCount.addAndGet(-this.getWorkSize(work));
                work = this.filterExpiredTask(work);
                if (!this.isEmptyWork(work)) break;
                ArtemisTraceExecutor.INSTANCE.markEvent(this._pollWorkResultEventType, "work-expired");
                this._pollWorkResultEventMetric.addEvent("work-expired");
            }
            int delay = this._trafficShaper.transmissionDelay();
            if (delay > 0) {
                ArtemisTraceExecutor.INSTANCE.markEvent(this._pollWorkResultEventType, "traffic-shaped");
                this._pollWorkResultEventMetric.addEvent("traffic-shaped");
                ThreadExtension.sleep((int)delay);
            }
            ArtemisTraceExecutor.INSTANCE.markEvent(this._pollWorkResultEventType, "work-available");
            this._pollWorkResultEventMetric.addEvent("work-available");
            return work;
        }
        catch (Error | RuntimeException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    protected abstract int getWorkSize(W var1);

    protected abstract boolean isEmptyWork(W var1);

    protected abstract W filterExpiredTask(W var1);

    protected abstract void assignWork();

    protected boolean isExpiredTask(T task) {
        return task.expiryTime() <= System.currentTimeMillis();
    }

    protected boolean isBufferFull() {
        return this._pendingTaskCount.get() >= (Integer)this._maxBufferSizeProperty.getValue();
    }

    private void loopAssign() {
        while (!this._isShutdown.get()) {
            Loops.executeWithoutTightLoop(() -> {
                try {
                    MultiWriteBatchReadList<T> acceptList = this._acceptList;
                    MultiWriteBatchReadList<T> reacceptList = this._reacceptList;
                    this.resetAcceptList();
                    ThreadExtension.sleep((int)((Integer)this._writeCompleteWaitProperty.getValue()));
                    ArtemisTraceExecutor.INSTANCE.execute(this._drainAcceptTraceKey, () -> this.drainAccept(acceptList.getAll(), reacceptList.getAll()));
                    ArtemisTraceExecutor.INSTANCE.execute(this._assignWorkTraceKey, () -> this.assignWork());
                }
                catch (Throwable ex) {
                    _logger.error("Assign work failed.", ex);
                }
            });
        }
    }

    private void drainAccept(List<T> acceptList, List<T> reacceptList) {
        for (Task task : acceptList) {
            Task previousTask = this._acceptedTasks.put(task.taskId(), task);
            if (previousTask == null) {
                this._processingOrder.add(task.taskId());
                ArtemisTraceExecutor.INSTANCE.markEvent(this._drainAcceptTaskTypeEventType, "accept-new");
                this._drainAcceptTaskTypeEventMetric.addEvent("accept-new");
                continue;
            }
            task.resetSubmitTime(previousTask.submitTime());
            ArtemisTraceExecutor.INSTANCE.markEvent(this._drainAcceptTaskTypeEventType, "accept-replace");
            this._drainAcceptTaskTypeEventMetric.addEvent("accept-replace");
        }
        for (Task task : Lists.reverse(reacceptList)) {
            Task oldestTask;
            if (this._acceptedTasks.containsKey(task.taskId())) {
                ArtemisTraceExecutor.INSTANCE.markEvent(this._drainAcceptTaskTypeEventType, "reaccept-drop");
                this._drainAcceptTaskTypeEventMetric.addEvent("reaccept-drop");
                continue;
            }
            String taskId = this._processingOrder.peek();
            if (taskId != null && (oldestTask = (Task)this._acceptedTasks.get(taskId)) != null) {
                task.resetSubmitTime(oldestTask.submitTime() - 1L);
            }
            this._acceptedTasks.put(task.taskId(), task);
            this._processingOrder.addFirst(task.taskId());
            ArtemisTraceExecutor.INSTANCE.markEvent(this._drainAcceptTaskTypeEventType, "reaccept-new");
            this._drainAcceptTaskTypeEventMetric.addEvent("reaccept-new");
        }
    }

    private void resetAcceptList() {
        this._acceptList = new MultiWriteBatchReadList(((Integer)this._acceptListInitCapacityProperty.getValue()).intValue());
        this._reacceptList = new MultiWriteBatchReadList(((Integer)this._reacceptListInitCapacityProperty.getValue()).intValue());
    }
}

