/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcess;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningConsumerProcesses;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningSubscriptionStatus;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;
import pl.allegro.tech.hermes.consumers.supervisor.process.SignalsFilter;

public class ConsumerProcessSupervisor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class);
    private final MonitoredMpscQueue<Signal> taskQueue;
    private final RunningConsumerProcesses runningProcesses = new RunningConsumerProcesses();
    private final Retransmitter retransmitter;
    private final ConsumersExecutorService executor;
    private final Clock clock;
    private final HermesMetrics metrics;
    private final SignalsFilter signalsFilter;
    private final long unhealthyAfter;
    private long killAfter;

    public ConsumerProcessSupervisor(ConsumersExecutorService executor, Retransmitter retransmitter, Clock clock, HermesMetrics metrics, ConfigFactory configs) {
        this.executor = executor;
        this.retransmitter = retransmitter;
        this.clock = clock;
        this.metrics = metrics;
        this.unhealthyAfter = configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_UNHEALTHY_AFTER);
        this.killAfter = configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_KILL_AFTER);
        this.taskQueue = new MonitoredMpscQueue(metrics, "signalQueue", configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE));
        this.signalsFilter = new SignalsFilter(this.taskQueue, clock);
    }

    public void accept(Signal signal) {
        this.taskQueue.offer(signal);
    }

    public Set<SubscriptionName> existingConsumers() {
        return this.runningProcesses.existingConsumers();
    }

    @Override
    public void run() {
        logger.debug("Starting process supervisor loop");
        long currentTime = this.clock.millis();
        this.restartUnhealthy();
        ArrayList<Signal> signalsToProcess = new ArrayList<Signal>();
        this.taskQueue.drain((MessagePassingQueue.Consumer<Signal>)((MessagePassingQueue.Consumer)signalsToProcess::add));
        this.signalsFilter.filterSignals(signalsToProcess, this.runningProcesses.existingConsumers()).forEach(this::tryToProcessSignal);
        logger.debug("Process supervisor loop took {} ms to check all consumers", (Object)(this.clock.millis() - currentTime));
    }

    private void restartUnhealthy() {
        this.runningProcesses.stream().filter(consumerProcess -> !consumerProcess.isHealthy()).forEach(consumerProcess -> {
            Signal restartUnhealthy = Signal.of(Signal.SignalType.RESTART_UNHEALTHY, consumerProcess.getSubscriptionName());
            logger.info("Lost contact with consumer {}, last seen {}ms ago {}. {}", new Object[]{consumerProcess, consumerProcess.lastSeen(), restartUnhealthy.getLogWithIdAndType()});
            this.taskQueue.offer(restartUnhealthy);
        });
    }

    private void tryToProcessSignal(Signal signal) {
        try {
            this.processSignal(signal);
        }
        catch (Exception exception) {
            logger.error("Supervisor failed to process signal {}", (Object)signal, (Object)exception);
        }
    }

    private void processSignal(Signal signal) {
        logger.debug("Processing signal: {}", (Object)signal);
        this.metrics.counter("supervisor.signal." + signal.getType().name()).inc();
        switch (signal.getType()) {
            case START: {
                this.start(signal);
                break;
            }
            case RETRANSMIT: 
            case UPDATE_SUBSCRIPTION: 
            case UPDATE_TOPIC: 
            case RESTART: 
            case COMMIT: {
                this.onConsumerProcess(signal, consumerProcess -> consumerProcess.accept(signal));
                break;
            }
            case STOP: {
                this.onConsumerProcess(signal, consumerProcess -> {
                    consumerProcess.accept(signal);
                    this.taskQueue.offer(signal.createChild(Signal.SignalType.KILL, this.killTime()));
                });
                break;
            }
            case KILL: {
                this.kill(signal);
                break;
            }
            case RESTART_UNHEALTHY: {
                this.onConsumerProcess(signal, consumerProcess -> {
                    consumerProcess.accept(signal.createChild(Signal.SignalType.RESTART));
                    this.taskQueue.offer(signal.createChild(Signal.SignalType.KILL_UNHEALTHY, this.killTime()));
                });
                break;
            }
            case KILL_UNHEALTHY: {
                this.onConsumerProcess(signal, consumerProcess -> {
                    this.taskQueue.offer(signal.createChild(Signal.SignalType.START, this.clock.millis(), consumerProcess.getConsumer()));
                    this.kill(signal);
                });
                break;
            }
            case CLEANUP: {
                this.cleanup(signal);
                break;
            }
            default: {
                logger.warn("Unknown signal {}", (Object)signal);
            }
        }
    }

    private long killTime() {
        return this.clock.millis() + this.killAfter;
    }

    private void onConsumerProcess(Signal signal, Consumer<ConsumerProcess> consumerProcessConsumer) {
        if (this.runningProcesses.hasProcess(signal.getTarget())) {
            consumerProcessConsumer.accept(this.runningProcesses.getProcess(signal.getTarget()));
        } else {
            this.metrics.counter("supervisor.signal.dropped." + signal.getType().name()).inc();
            logger.warn("Dropping signal {} as target consumer process does not exist.", (Object)signal);
        }
    }

    private void kill(Signal signal) {
        if (!this.runningProcesses.hasProcess(signal.getTarget())) {
            logger.info("Process for subscription {} no longer exists. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
        } else {
            logger.info("Interrupting consumer process for subscription {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
            Future task = this.runningProcesses.getExecutionHandle(signal.getTarget());
            if (!task.isDone()) {
                if (task.cancel(true)) {
                    logger.info("Interrupted consumer process {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
                } else {
                    logger.error("Failed to interrupt consumer process {}, possible stale consumer. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
                }
            } else {
                this.runningProcesses.remove(signal.getTarget());
                logger.info("Consumer was already dead process {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
            }
        }
    }

    private void start(Signal start) {
        logger.info("Starting consumer process for subscription {}. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
        if (!this.runningProcesses.hasProcess(start.getTarget())) {
            ConsumerProcess process = new ConsumerProcess(start, this.retransmitter, this::handleProcessShutdown, this.clock, this.unhealthyAfter);
            Future future = this.executor.execute(process);
            this.runningProcesses.add(process, future);
            logger.info("Started consumer process for subscription {}. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
        } else {
            logger.info("Abort consumer process start: process for subscription {} is already running. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
        }
    }

    private void handleProcessShutdown(Signal lastSignal) {
        if (Thread.interrupted()) {
            Signal cleanup = Signal.of(Signal.SignalType.CLEANUP, lastSignal.getTarget());
            logger.info("Consumer process was interrupted. Its last processed signal is {}. Accepting {}", (Object)lastSignal, (Object)cleanup);
            this.accept(cleanup);
        } else {
            this.accept(lastSignal.createChild(Signal.SignalType.CLEANUP));
        }
    }

    private void cleanup(Signal signal) {
        logger.info("Removing consumer process for subscription {}. {}", (Object)signal.getTarget(), (Object)signal.getLogWithIdAndType());
        this.runningProcesses.remove(signal.getTarget());
    }

    public void shutdown() {
        this.runningProcesses.stream().forEach(p -> p.accept(Signal.of(Signal.SignalType.STOP, p.getSubscriptionName())));
        this.executor.shutdown();
    }

    public List<RunningSubscriptionStatus> listRunningSubscriptions() {
        return this.runningProcesses.listRunningSubscriptions();
    }

    public Integer countRunningSubscriptions() {
        return this.runningProcesses.count();
    }
}

