/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Future;
import org.jctools.queues.MpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcess;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningConsumerProcesses;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;
import pl.allegro.tech.hermes.consumers.supervisor.process.SignalsFilter;

public class ConsumerProcessSupervisor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class);
    private final MpscArrayQueue<Signal> taskQueue = new MpscArrayQueue(1000);
    private final RunningConsumerProcesses runningProcesses = new RunningConsumerProcesses();
    private final Retransmitter retransmitter;
    private final ConsumersExecutorService executor;
    private final Clock clock;
    private final HermesMetrics metrics;
    private final SignalsFilter signalsFilter;
    private final long unhealthyAfter;
    private long killAfter;

    public ConsumerProcessSupervisor(ConsumersExecutorService executor, Retransmitter retransmitter, Clock clock, HermesMetrics metrics, ConfigFactory configs) {
        this.executor = executor;
        this.retransmitter = retransmitter;
        this.clock = clock;
        this.metrics = metrics;
        this.unhealthyAfter = configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_UNHEALTHY_AFTER);
        this.killAfter = configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_KILL_AFTER);
        this.signalsFilter = new SignalsFilter(this.taskQueue, clock);
    }

    public void accept(Signal signal) {
        this.taskQueue.offer((Object)signal);
    }

    public Set<SubscriptionName> existingConsumers() {
        return this.runningProcesses.existingConsumers();
    }

    @Override
    public void run() {
        logger.debug("Starting process supervisor loop");
        long currentTime = this.clock.millis();
        this.restartUnhealthy();
        ArrayList<Signal> signalsToProcess = new ArrayList<Signal>();
        this.taskQueue.drain(signalsToProcess::add);
        this.signalsFilter.filterSignals(signalsToProcess, this.runningProcesses.existingConsumers()).forEach(this::processSignal);
        logger.debug("Process supervisor loop took {} ms to check all consumers", (Object)(this.clock.millis() - currentTime));
    }

    private void restartUnhealthy() {
        this.runningProcesses.stream().filter(consumerProcess -> !this.isHealthy((ConsumerProcess)consumerProcess)).forEach(consumerProcess -> this.taskQueue.offer((Object)Signal.of(Signal.SignalType.RESTART_UNHEALTHY, consumerProcess.getSubscriptionName())));
    }

    private void processSignal(Signal signal) {
        try {
            logger.debug("Processing signal: {}", (Object)signal);
            this.metrics.counter("supervisor.signal." + signal.getType().name()).inc();
            switch (signal.getType()) {
                case START: {
                    this.start(signal.getTarget(), (Consumer)signal.getPayload());
                    break;
                }
                case RETRANSMIT: {
                    this.process(signal).accept(signal);
                    break;
                }
                case UPDATE_SUBSCRIPTION: 
                case UPDATE_TOPIC: {
                    this.process(signal).accept(signal);
                    break;
                }
                case RESTART: {
                    this.process(signal).accept(signal);
                    break;
                }
                case STOP: {
                    this.process(signal).accept(signal);
                    this.taskQueue.offer((Object)Signal.of(Signal.SignalType.KILL, signal.getTarget(), this.killTime()));
                    break;
                }
                case KILL: {
                    this.kill(signal.getTarget());
                    break;
                }
                case RESTART_UNHEALTHY: {
                    this.process(signal).accept(Signal.of(Signal.SignalType.RESTART, signal.getTarget()));
                    this.taskQueue.offer((Object)Signal.of(Signal.SignalType.KILL_UNHEALTHY, signal.getTarget(), this.killTime()));
                    break;
                }
                case KILL_UNHEALTHY: {
                    Consumer consumer = this.runningProcesses.getProcess(signal.getTarget()).getConsumer();
                    this.taskQueue.offer((Object)Signal.of(Signal.SignalType.START, signal.getTarget(), consumer));
                    this.kill(signal.getTarget());
                }
                case CLEANUP: {
                    this.cleanup(signal.getTarget());
                    break;
                }
            }
        }
        catch (Exception exception) {
            logger.error("Supervisor failed to process signal {}", (Object)signal, (Object)exception);
        }
    }

    private long killTime() {
        return this.clock.millis() + this.killAfter;
    }

    private ConsumerProcess process(Signal signal) {
        return this.runningProcesses.getProcess(signal.getTarget());
    }

    private void kill(SubscriptionName subscriptionName) {
        if (this.runningProcesses.hasProcess(subscriptionName)) {
            logger.info("Process for subscription {} no longer exists", (Object)subscriptionName);
        } else {
            logger.info("Interrupting consumer process for subscription {}", (Object)subscriptionName);
            Future task = this.runningProcesses.getExecutionHandle(subscriptionName);
            if (!task.isDone()) {
                if (task.cancel(true)) {
                    logger.info("Interrupted consumer process {}", (Object)subscriptionName);
                } else {
                    logger.error("Failed to interrupt consumer process {}, possible stale consumer", (Object)subscriptionName);
                }
            } else {
                logger.info("Consumer was already dead process {}", (Object)subscriptionName);
            }
        }
    }

    private boolean isHealthy(ConsumerProcess consumerProcess) {
        long delta = this.clock.millis() - consumerProcess.healtcheckRefreshTime();
        if (delta > this.unhealthyAfter) {
            logger.info("Lost contact with consumer {}, last seen {}ms ago", (Object)consumerProcess, (Object)delta);
            return false;
        }
        return true;
    }

    private void start(SubscriptionName subscriptionName, Consumer consumer) {
        logger.info("Starting consumer process {}", (Object)subscriptionName);
        if (!this.runningProcesses.hasProcess(subscriptionName)) {
            ConsumerProcess process = new ConsumerProcess(subscriptionName, consumer, this.retransmitter, this::handleProcessShutdown, this.clock);
            Future future = this.executor.execute(process);
            this.runningProcesses.add(process, future);
            logger.info("Started consumer process {}", (Object)process);
        } else {
            logger.info("Abort consumer process start: process for subscription {} is already running", (Object)subscriptionName);
        }
    }

    private void handleProcessShutdown(SubscriptionName subscriptionName) {
        this.accept(Signal.of(Signal.SignalType.CLEANUP, subscriptionName));
    }

    private void cleanup(SubscriptionName subscriptionName) {
        this.runningProcesses.remove(subscriptionName);
    }

    public void shutdown() {
        this.runningProcesses.stream().forEach(p -> p.accept(Signal.of(Signal.SignalType.STOP, p.getSubscriptionName())));
        this.executor.shutdown();
    }
}

