/*
 * Decompiled with CFR 0.152.
 */
package org.duracloud.mill.workman;

import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.mill.workman.TaskWorker;
import org.duracloud.mill.workman.TaskWorkerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskWorkerManager {
    private Logger log = LoggerFactory.getLogger(TaskWorkerManager.class);
    public static final int DEFAULT_MAX_WORKERS = 5;
    public static final String MAX_WORKER_PROPERTY_KEY = "max-workers";
    public static final String MIN_WAIT_BEFORE_TAKE_KEY = "min-wait-before-take";
    public static final long DEFAULT_MIN_WAIT_BEFORE_TAKE = 15000L;
    private static final long DEFAULT_MAX_WAIT_BEFORE_TAKE = 480000L;
    private Long defaultMinWaitTime = 15000L;
    private TaskWorkerFactory factory;
    private ThreadPoolExecutor executor;
    private boolean stop = false;
    private Timer timer = new Timer();
    private List<TaskQueueExecutor> taskQueueExecutors;
    private TaskQueue deadLetterQueue = null;
    private List<TaskQueue> taskQueues;

    public TaskWorkerManager(List<TaskQueue> taskQueues, TaskQueue deadLetterQueue, TaskWorkerFactory factory) {
        if (factory == null) {
            throw new IllegalArgumentException("factory must be non-null");
        }
        if (taskQueues == null || taskQueues.isEmpty()) {
            throw new IllegalArgumentException("at least one taskQueue must be specified in the taskQueues list.");
        }
        if (deadLetterQueue == null) {
            throw new IllegalArgumentException("deadLetterQueue must be non-null");
        }
        this.factory = factory;
        this.taskQueues = taskQueues;
        int size = taskQueues.size();
        this.taskQueueExecutors = new ArrayList<TaskQueueExecutor>(size);
        for (int i = 0; i < size; ++i) {
            boolean lowestPriority = i == size - 1;
            long minWait = this.defaultMinWaitTime;
            long maxWait = 480000L;
            if (!lowestPriority) {
                minWait = 1000L;
                maxWait = 30000L;
            }
            this.taskQueueExecutors.add(new TaskQueueExecutor(taskQueues.get(i), minWait, maxWait));
        }
        this.deadLetterQueue = deadLetterQueue;
    }

    public void init() {
        this.defaultMinWaitTime = Long.valueOf(System.getProperty(MIN_WAIT_BEFORE_TAKE_KEY, "15000"));
        Integer maxThreadCount = Integer.valueOf(System.getProperty(MAX_WORKER_PROPERTY_KEY, String.valueOf(5)));
        this.executor = new ThreadPoolExecutor(maxThreadCount, maxThreadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        new Thread(new Runnable(){

            @Override
            public void run() {
                TaskWorkerManager.this.runManager();
            }
        }).start();
        this.timer.scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                LinkedList<String> queueStats = new LinkedList<String>();
                for (TaskQueue queue : TaskWorkerManager.this.taskQueues) {
                    queueStats.add(this.formatQueueStat(queue));
                }
                queueStats.add(this.formatQueueStat(TaskWorkerManager.this.deadLetterQueue));
                TaskWorkerManager.this.log.info("Status: max_workers={} running_workers={} completed_workers={}, {}", TaskWorkerManager.this.getMaxWorkers(), TaskWorkerManager.this.executor.getActiveCount(), TaskWorkerManager.this.executor.getCompletedTaskCount(), StringUtils.join(queueStats, " "));
            }

            private String formatQueueStat(TaskQueue queue) {
                return queue.getName() + "_q_size=" + queue.size();
            }
        }, new Date(), 60000L);
    }

    private void runManager() {
        while (!this.stop) {
            try {
                if (this.isManagerTooBusy()) {
                    this.log.debug("manager is too busy, sleeping for 1 sec");
                    this.sleep(1000L);
                    continue;
                }
                boolean executedOne = false;
                for (TaskQueueExecutor taskQueueExecutor : this.taskQueueExecutors) {
                    if (!taskQueueExecutor.execute()) continue;
                    executedOne = true;
                    break;
                }
                if (executedOne) continue;
                this.sleep(this.defaultMinWaitTime);
            }
            catch (Exception ex) {
                this.log.error("unexpected failure in outer run manager while loop: " + ex.getMessage() + ". Ignoring...", ex);
            }
        }
    }

    private boolean isManagerTooBusy() {
        boolean tooBusy;
        int active = this.executor.getActiveCount();
        int maxPoolSize = this.executor.getMaximumPoolSize();
        int queueSize = this.executor.getQueue().size();
        boolean bl = tooBusy = active + queueSize >= maxPoolSize;
        if (tooBusy) {
            this.log.info("manager is too busy: active worker count = {}; workers awaiting execution (thread pool queue size) = {}", (Object)active, (Object)queueSize);
        }
        return tooBusy;
    }

    public int getMaxWorkers() {
        return this.executor.getMaximumPoolSize();
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void destroy() {
        this.stop = true;
        this.timer.cancel();
        this.executor.shutdown();
        this.log.info("terminating...waiting for threads to complete processing...");
        while (!this.executor.isTerminated()) {
            this.sleep(1000L);
        }
        this.log.info("terminated");
    }

    private class TaskQueueExecutor {
        private TaskQueue taskQueue;
        private long currentWaitBeforeTaskMs;
        private Date nextAttempt = null;
        private long minWaitTime;
        private long maxWaitTime;

        public TaskQueueExecutor(TaskQueue taskQueue, long minWaitTime, long maxWaitTime) {
            this.taskQueue = taskQueue;
            this.minWaitTime = minWaitTime;
            this.maxWaitTime = maxWaitTime;
            this.currentWaitBeforeTaskMs = minWaitTime;
        }

        public boolean execute() {
            if (this.nextAttempt != null && this.nextAttempt.getTime() > System.currentTimeMillis()) {
                return false;
            }
            try {
                TaskWorker worker = TaskWorkerManager.this.factory.create(this.taskQueue.take(), this.taskQueue);
                TaskWorkerManager.this.executor.execute(worker);
                this.nextAttempt = null;
                this.currentWaitBeforeTaskMs = this.minWaitTime;
                return true;
            }
            catch (TimeoutException e) {
                TaskWorkerManager.this.log.debug("Timeout: {} queue is empty:  message={}", (Object)this.taskQueue.getName(), (Object)e.getMessage());
                this.nextAttempt = new Date(System.currentTimeMillis() + this.currentWaitBeforeTaskMs);
                this.currentWaitBeforeTaskMs = Math.min(this.currentWaitBeforeTaskMs * 2L, this.maxWaitTime);
                return false;
            }
        }
    }
}

