/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.process;

import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.queue.MonitoredMpscQueue;
import pl.allegro.tech.hermes.consumers.queue.MpscQueue;
import pl.allegro.tech.hermes.consumers.queue.WaitFreeDrainMpscQueue;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersExecutorService;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcess;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcessKiller;
import pl.allegro.tech.hermes.consumers.supervisor.process.ConsumerProcessSupplier;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningConsumerProcess;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningConsumerProcesses;
import pl.allegro.tech.hermes.consumers.supervisor.process.RunningSubscriptionStatus;
import pl.allegro.tech.hermes.consumers.supervisor.process.Signal;
import pl.allegro.tech.hermes.consumers.supervisor.process.SignalsFilter;

public class ConsumerProcessSupervisor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerProcessSupervisor.class);
    private final MpscQueue<Signal> taskQueue;
    private final RunningConsumerProcesses runningConsumerProcesses;
    private final ConsumerProcessKiller processKiller;
    private final ConsumersExecutorService executor;
    private final Clock clock;
    private final HermesMetrics metrics;
    private final SignalsFilter signalsFilter;
    private final ConsumerProcessSupplier processFactory;

    public ConsumerProcessSupervisor(ConsumersExecutorService executor, Clock clock, HermesMetrics metrics, ConfigFactory configs, ConsumerProcessSupplier processFactory) {
        this.executor = executor;
        this.clock = clock;
        this.metrics = metrics;
        int signalQueueSize = configs.getIntProperty(Configs.CONSUMER_SIGNAL_PROCESSING_QUEUE_SIZE);
        this.taskQueue = new MonitoredMpscQueue<Signal>(new WaitFreeDrainMpscQueue(signalQueueSize), metrics, "signalQueue");
        this.signalsFilter = new SignalsFilter(this.taskQueue, clock);
        this.runningConsumerProcesses = new RunningConsumerProcesses(clock);
        this.processKiller = new ConsumerProcessKiller(configs.getIntProperty(Configs.CONSUMER_BACKGROUND_SUPERVISOR_KILL_AFTER), clock);
        this.processFactory = processFactory;
        metrics.registerRunningConsumerProcessesCountGauge(this.runningConsumerProcesses::count);
        metrics.registerDyingConsumerProcessesCountGauge(this.processKiller::countDying);
    }

    public ConsumerProcessSupervisor accept(Signal signal) {
        this.taskQueue.offer(signal);
        return this;
    }

    public Set<SubscriptionName> existingConsumers() {
        return this.runningConsumerProcesses.existingConsumers();
    }

    @Override
    public void run() {
        logger.debug("Starting process supervisor loop");
        long currentTime = this.clock.millis();
        this.restartUnhealthy();
        this.processKiller.killAllDying();
        ArrayList<Signal> signalsToProcess = new ArrayList<Signal>();
        this.taskQueue.drain((MessagePassingQueue.Consumer<Signal>)((MessagePassingQueue.Consumer)signalsToProcess::add));
        this.signalsFilter.filterSignals(signalsToProcess).forEach(this::tryToProcessSignal);
        logger.debug("Process supervisor loop took {} ms to check all consumers", (Object)(this.clock.millis() - currentTime));
    }

    public void shutdown() {
        this.runningConsumerProcesses.stream().forEach(p -> p.getConsumerProcess().accept(Signal.of(Signal.SignalType.STOP, p.getConsumerProcess().getSubscriptionName())));
        this.processKiller.killAllDying();
        this.executor.shutdown();
    }

    public List<RunningSubscriptionStatus> runningSubscriptionsStatus() {
        return this.runningConsumerProcesses.listRunningSubscriptions();
    }

    public Integer countRunningProcesses() {
        return this.runningConsumerProcesses.count();
    }

    private void restartUnhealthy() {
        this.runningConsumerProcesses.stream().filter(process -> !process.getConsumerProcess().isHealthy()).collect(Collectors.toList()).forEach(process -> {
            logger.info("Lost contact with consumer {} (last seen {}ms ago). Attempting to kill this process and spawn new one.", (Object)process.getConsumerProcess(), (Object)process.getConsumerProcess().lastSeen());
            this.processKiller.kill((RunningConsumerProcess)process);
            this.runningConsumerProcesses.remove((RunningConsumerProcess)process);
            this.taskQueue.offer(Signal.of(Signal.SignalType.START, process.getSubscription().getQualifiedName(), process.getSubscription()));
        });
    }

    private void tryToProcessSignal(Signal signal) {
        try {
            this.processSignal(signal);
        }
        catch (Exception exception) {
            logger.error("Supervisor failed to process signal {}", (Object)signal, (Object)exception);
        }
    }

    private void processSignal(Signal signal) {
        logger.debug("Processing signal: {}", (Object)signal);
        this.metrics.counter("supervisor.signal." + signal.getType().name()).inc();
        switch (signal.getType()) {
            case START: {
                this.start(signal);
                break;
            }
            case UPDATE_SUBSCRIPTION: {
                this.updateSubscription(signal);
                break;
            }
            case UPDATE_TOPIC: 
            case RETRANSMIT: 
            case COMMIT: {
                this.forRunningConsumerProcess(signal, runningProcess -> runningProcess.getConsumerProcess().accept(signal));
                break;
            }
            case STOP: {
                this.stop(signal);
                break;
            }
            default: {
                logger.warn("Unknown signal {}", (Object)signal);
            }
        }
    }

    private void updateSubscription(Signal signal) {
        if (this.runningConsumerProcesses.hasProcess(signal.getTarget())) {
            this.stopOrUpdateConsumer(signal);
        } else {
            this.drop(signal);
        }
    }

    private void stopOrUpdateConsumer(Signal signal) {
        Subscription signalSubscription = (Subscription)signal.getPayload();
        if (!this.deliveryTypesEqual(signalSubscription)) {
            logger.info("Stopping subscription: {} because of delivery type update", (Object)signalSubscription.getQualifiedName());
            this.stop(Signal.of(Signal.SignalType.STOP, signal.getTarget()));
        } else {
            this.forRunningConsumerProcess(signal, runningProcess -> runningProcess.getConsumerProcess().accept(signal));
        }
    }

    private boolean deliveryTypesEqual(Subscription signalSubscription) {
        return signalSubscription.getDeliveryType() == this.runningConsumerProcesses.getProcess(signalSubscription.getQualifiedName()).getSubscription().getDeliveryType();
    }

    private void stop(Signal signal) {
        this.forRunningConsumerProcess(signal, runningProcess -> {
            this.processKiller.observe((RunningConsumerProcess)runningProcess);
            this.runningConsumerProcesses.remove((RunningConsumerProcess)runningProcess);
            runningProcess.getConsumerProcess().accept(signal);
        });
    }

    private void forRunningConsumerProcess(Signal signal, Consumer<RunningConsumerProcess> consumerProcessConsumer) {
        if (this.runningConsumerProcesses.hasProcess(signal.getTarget())) {
            consumerProcessConsumer.accept(this.runningConsumerProcesses.getProcess(signal.getTarget()));
        } else {
            this.drop(signal);
        }
    }

    private void drop(Signal signal) {
        this.metrics.counter("supervisor.signal.dropped." + signal.getType().name()).inc();
        logger.warn("Dropping signal {} as running target consumer process does not exist.", (Object)signal);
    }

    private void start(Signal start) {
        Subscription subscription = this.getSubscriptionFromPayload(start);
        if (!this.hasProcess(start.getTarget())) {
            try {
                logger.info("Creating consumer for {}", (Object)subscription.getQualifiedName());
                ConsumerProcess process = this.processFactory.createProcess(subscription, start, this.processKiller::cleanup);
                logger.info("Created consumer for {}. {}", (Object)subscription.getQualifiedName(), (Object)start.getLogWithIdAndType());
                logger.info("Starting consumer process for subscription {}. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
                Future future = this.executor.execute(process);
                logger.info("Consumer for {} was added for execution. {}", (Object)subscription.getQualifiedName(), (Object)start.getLogWithIdAndType());
                this.runningConsumerProcesses.add(process, future);
                logger.info("Started consumer process for subscription {}. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
            }
            catch (Exception ex) {
                logger.error("Failed to create consumer for subscription {}", (Object)subscription.getQualifiedName(), (Object)ex);
            }
        } else if (this.processKiller.isDying(start.getTarget())) {
            logger.info("Consumer process for {} is already dying, startup deferred.", (Object)subscription.getQualifiedName());
            this.accept(start);
        } else {
            logger.info("Abort consumer process start: process for subscription {} is already running. {}", (Object)start.getTarget(), (Object)start.getLogWithIdAndType());
        }
    }

    private boolean hasProcess(SubscriptionName subscriptionName) {
        return this.runningConsumerProcesses.hasProcess(subscriptionName) || this.processKiller.isDying(subscriptionName);
    }

    private Subscription getSubscriptionFromPayload(Signal startSignal) {
        if (!(startSignal.getPayload() instanceof Subscription)) {
            throw new IllegalArgumentException("Signal's payload has to be Subscription type");
        }
        return (Subscription)startSignal.getPayload();
    }
}

