/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.Future;
import org.jctools.queues.MpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Consumer;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcess;
import pl.allegro.tech.hermes.consumers.supervisor.process.Retransmitter;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningConsumerProcesses;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;

public class ConsumerProcessSupervisor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class);
    private final MpscArrayQueue<Signal> taskQueue = new MpscArrayQueue(1000);
    private final RunningConsumerProcesses runningProcesses = new RunningConsumerProcesses();
    private final Retransmitter retransmitter;
    private final ConsumersExecutorService executor;
    private final Clock clock;
    private final HermesMetrics metrics;
    private long unhealthyAfter;

    public ConsumerProcessSupervisor(ConsumersExecutorService executor, Retransmitter retransmitter, Clock clock, HermesMetrics metrics, ConfigFactory configs) {
        this.executor = executor;
        this.retransmitter = retransmitter;
        this.clock = clock;
        this.metrics = metrics;
        this.unhealthyAfter = configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_UNHEALTHY_AFTER);
    }

    public void accept(Signal signal) {
        this.taskQueue.offer((Object)signal);
    }

    @Override
    public void run() {
        logger.debug("Starting process supervisor loop");
        long currentTime = this.clock.millis();
        this.restartUnhealthy();
        this.taskQueue.drain(this::processSignal);
        logger.debug("Process supervisor loop took {} ms to check all consumers", (Object)(this.clock.millis() - currentTime));
    }

    private void restartUnhealthy() {
        this.runningProcesses.stream().filter(consumerProcess -> !this.isHealthy((ConsumerProcess)consumerProcess)).forEach(consumerProcess -> this.taskQueue.offer((Object)Signal.of(Signal.SignalType.KILL_RESTART, consumerProcess.getSubscriptionName())));
    }

    private void processSignal(Signal signal) {
        try {
            logger.debug("Processing signal: {}", (Object)signal);
            this.metrics.counter("supervisor.signal." + signal.getType().name()).inc();
            switch (signal.getType()) {
                case START: {
                    this.start(signal.getTarget(), signal.getPayload());
                    break;
                }
                case RETRANSMIT: {
                    this.process(signal).accept(signal);
                    break;
                }
                case UPDATE_SUBSCRIPTION: 
                case UPDATE_TOPIC: {
                    this.process(signal).accept(signal);
                    break;
                }
                case RESTART: 
                case KILL_RESTART: {
                    Consumer consumer = this.process(signal).getConsumer();
                    this.kill(this.process(signal));
                    this.taskQueue.offer((Object)Signal.of(Signal.SignalType.START, signal.getTarget(), consumer));
                    break;
                }
                case STOP: {
                    this.process(signal).accept(signal);
                    this.taskQueue.offer((Object)Signal.of(Signal.SignalType.CLEANUP, signal.getTarget()));
                    break;
                }
                case CLEANUP: {
                    this.cleanup(this.process(signal));
                    break;
                }
            }
        }
        catch (Exception exception) {
            logger.error("Supervisor failed to process signal {}", (Object)signal, (Object)exception);
        }
    }

    private ConsumerProcess process(Signal signal) {
        return this.runningProcesses.getProcess(signal.getTarget());
    }

    private void kill(ConsumerProcess consumerProcess) {
        logger.info("Interrupting consumer process {}", (Object)consumerProcess);
        Future task = this.runningProcesses.getExecutionHandle(consumerProcess);
        this.runningProcesses.remove(consumerProcess);
        if (!task.isDone()) {
            if (task.cancel(true)) {
                logger.info("Interrupted consumer process {}", (Object)consumerProcess);
            } else {
                logger.error("Failed to interrupt consumer process {}, possible stale consumer", (Object)consumerProcess);
            }
        } else {
            logger.info("Consumer was already dead process {}", (Object)consumerProcess);
        }
    }

    private boolean isHealthy(ConsumerProcess consumerProcess) {
        long delta = this.clock.millis() - consumerProcess.healtcheckRefreshTime();
        if (delta > this.unhealthyAfter) {
            logger.info("Lost contact with consumer {}, last seen {}ms ago", (Object)consumerProcess, (Object)delta);
            return false;
        }
        return true;
    }

    private void start(SubscriptionName subscriptionName, Optional<Consumer> consumer) {
        ConsumerProcess process;
        logger.info("Starting consumer process {}", (Object)subscriptionName);
        if (consumer.isPresent()) {
            process = new ConsumerProcess(subscriptionName, consumer.get(), this.retransmitter, this.clock);
        } else if (this.runningProcesses.hasProcess(subscriptionName)) {
            process = this.runningProcesses.getProcess(subscriptionName);
        } else {
            logger.error("Failed to start: no consumer process for {} and none provided", (Object)subscriptionName);
            return;
        }
        Future future = this.executor.execute(process);
        this.runningProcesses.add(process, future);
        logger.info("Started consumer process {}", (Object)process);
    }

    private void cleanup(ConsumerProcess consumerProcess) {
        this.kill(consumerProcess);
    }

    public void shutdown() {
        this.runningProcesses.stream().forEach(p -> p.accept(Signal.of(Signal.SignalType.STOP, p.getSubscriptionName())));
        this.executor.shutdown();
    }
}

