/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.monitor;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SupervisorController;

public class ConsumersRuntimeMonitor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumersRuntimeMonitor.class);
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("consumer-monitor-%d").build());
    private final int scanIntervalSeconds;
    private final ConsumersSupervisor consumerSupervisor;
    private final SupervisorController workloadSupervisor;
    private final SubscriptionsCache subscriptionsCache;
    private final MonitorMetrics monitorMetrics = new MonitorMetrics();
    private ScheduledFuture monitoringTask;

    public ConsumersRuntimeMonitor(ConsumersSupervisor consumerSupervisor, SupervisorController workloadSupervisor, HermesMetrics hermesMetrics, SubscriptionsCache subscriptionsCache, ConfigFactory configFactory) {
        this.consumerSupervisor = consumerSupervisor;
        this.workloadSupervisor = workloadSupervisor;
        this.subscriptionsCache = subscriptionsCache;
        this.scanIntervalSeconds = configFactory.getIntProperty(Configs.CONSUMER_WORKLOAD_MONITOR_SCAN_INTERVAL);
        hermesMetrics.registerGauge("consumers-workload.monitor.running", () -> this.monitorMetrics.running);
        hermesMetrics.registerGauge("consumers-workload.monitor.assigned", () -> this.monitorMetrics.assigned);
        hermesMetrics.registerGauge("consumers-workload.monitor.missing", () -> this.monitorMetrics.missing);
        hermesMetrics.registerGauge("consumers-workload.monitor.oversubscribed", () -> this.monitorMetrics.oversubscribed);
    }

    @Override
    public void run() {
        try {
            Set<SubscriptionName> assigned = this.workloadSupervisor.assignedSubscriptions();
            Set<SubscriptionName> running = this.consumerSupervisor.runningConsumers();
            Set<SubscriptionName> missing = this.missing(assigned, running);
            Set<SubscriptionName> oversubscribed = this.oversubscribed(assigned, running);
            this.log(assigned, running, missing, oversubscribed);
            this.updateMetrics(assigned, running, missing, oversubscribed);
            this.ensureCorrectness(missing, oversubscribed);
        }
        catch (Exception exception) {
            logger.error("Could not check correctness of assignments", (Throwable)exception);
        }
    }

    public void start() {
        this.monitoringTask = this.executor.scheduleWithFixedDelay(this, this.scanIntervalSeconds, this.scanIntervalSeconds, TimeUnit.SECONDS);
    }

    public void shutdown() throws InterruptedException {
        this.monitoringTask.cancel(false);
        this.executor.shutdown();
        this.executor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    private void ensureCorrectness(Set<SubscriptionName> missing, Set<SubscriptionName> oversubscribed) {
        if (!missing.isEmpty() || !oversubscribed.isEmpty()) {
            logger.info("Fixing runtime. Creating {} and killing {} consumers", (Object)missing.size(), (Object)oversubscribed.size());
        }
        missing.stream().map(this.subscriptionsCache::getSubscription).forEach(this.consumerSupervisor::assignConsumerForSubscription);
        oversubscribed.forEach(this.consumerSupervisor::deleteConsumerForSubscriptionName);
    }

    private void log(Set<SubscriptionName> assigned, Set<SubscriptionName> running, Set<SubscriptionName> missing, Set<SubscriptionName> oversubscribed) {
        for (SubscriptionName subscriptionName : missing) {
            logger.warn("Missing consumer process for subscription: {}", (Object)subscriptionName);
        }
        for (SubscriptionName subscriptionName : oversubscribed) {
            logger.warn("Unwanted consumer process for subscription: {}", (Object)subscriptionName);
        }
        logger.info("Subscriptions assigned: {}, existing subscriptions: {}, missing: {}, oversubscribed: {}", new Object[]{assigned.size(), running.size(), missing.size(), oversubscribed.size()});
    }

    private void updateMetrics(Set<SubscriptionName> assigned, Set<SubscriptionName> running, Set<SubscriptionName> missing, Set<SubscriptionName> oversubscribed) {
        this.monitorMetrics.assigned = assigned.size();
        this.monitorMetrics.running = running.size();
        this.monitorMetrics.missing = missing.size();
        this.monitorMetrics.oversubscribed = oversubscribed.size();
    }

    private Set<SubscriptionName> missing(Set<SubscriptionName> assignedSubscriptions, Set<SubscriptionName> runningSubscriptions) {
        return Sets.difference(assignedSubscriptions, runningSubscriptions).immutableCopy();
    }

    private Set<SubscriptionName> oversubscribed(Set<SubscriptionName> assignedSubscriptions, Set<SubscriptionName> runningSubscriptions) {
        return Sets.difference(runningSubscriptions, assignedSubscriptions).immutableCopy();
    }

    private static class MonitorMetrics {
        volatile int assigned;
        volatile int running;
        volatile int missing;
        volatile int oversubscribed;

        private MonitorMetrics() {
        }
    }
}

