/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.monitor;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SupervisorController;

public class ConsumersRuntimeMonitor
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ConsumersRuntimeMonitor.class);
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("consumer-monitor-%d").build());
    private final int scanIntervalSeconds;
    private final ConsumersSupervisor consumerSupervisor;
    private final SupervisorController workloadSupervisor;
    private final MonitorMetrics monitorMetrics = new MonitorMetrics();

    public ConsumersRuntimeMonitor(ConsumersSupervisor consumerSupervisor, SupervisorController workloadSupervisor, HermesMetrics hermesMetrics, ConfigFactory configFactory) {
        this.consumerSupervisor = consumerSupervisor;
        this.workloadSupervisor = workloadSupervisor;
        this.scanIntervalSeconds = configFactory.getIntProperty(Configs.CONSUMER_WORKLOAD_MONITOR_SCAN_INTERVAL);
        hermesMetrics.registerGauge("consumers-workload.monitor.running", () -> this.monitorMetrics.running);
        hermesMetrics.registerGauge("consumers-workload.monitor.assigned", () -> this.monitorMetrics.assigned);
        hermesMetrics.registerGauge("consumers-workload.monitor.missing", () -> this.monitorMetrics.missing);
        hermesMetrics.registerGauge("consumers-workload.monitor.oversubscribed", () -> this.monitorMetrics.oversubscribed);
    }

    public void checkCorrectness() {
        Set<SubscriptionName> assignedSubscriptions = this.workloadSupervisor.assignedSubscriptions();
        Set<SubscriptionName> runningSubscriptions = this.consumerSupervisor.runningConsumers();
        this.monitorMetrics.assigned = assignedSubscriptions.size();
        this.monitorMetrics.running = runningSubscriptions.size();
        Set<SubscriptionName> missingSubscriptions = this.missing(assignedSubscriptions, runningSubscriptions);
        this.monitorMetrics.missing = missingSubscriptions.size();
        for (SubscriptionName subscriptionName : missingSubscriptions) {
            logger.warn("Missing consumer process for subscription: {}", (Object)subscriptionName);
        }
        Set<SubscriptionName> oversubscribedSubscriptions = this.oversubscribed(assignedSubscriptions, runningSubscriptions);
        this.monitorMetrics.oversubscribed = oversubscribedSubscriptions.size();
        for (SubscriptionName subscriptionName : oversubscribedSubscriptions) {
            logger.warn("Unwanted consumer process for subscription: {}", (Object)subscriptionName);
        }
        logger.info("Subscriptions assigned: {}, existing subscriptions: {}, missing: {}, oversubscribed: {}", new Object[]{assignedSubscriptions.size(), runningSubscriptions.size(), missingSubscriptions.size(), oversubscribedSubscriptions.size()});
    }

    @Override
    public void run() {
        try {
            this.checkCorrectness();
        }
        catch (Exception exception) {
            logger.error("Could not check correctness of assignments", (Throwable)exception);
        }
    }

    private Set<SubscriptionName> missing(Set<SubscriptionName> assignedSubscriptions, Set<SubscriptionName> runningSubscriptions) {
        return Sets.difference(assignedSubscriptions, runningSubscriptions).immutableCopy();
    }

    private Set<SubscriptionName> oversubscribed(Set<SubscriptionName> assignedSubscriptions, Set<SubscriptionName> runningSubscriptions) {
        return Sets.difference(runningSubscriptions, assignedSubscriptions).immutableCopy();
    }

    public void start() {
        this.executor.scheduleWithFixedDelay(this, this.scanIntervalSeconds, this.scanIntervalSeconds, TimeUnit.SECONDS);
    }

    public void shutdown() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    private static class MonitorMetrics {
        volatile int assigned;
        volatile int running;
        volatile int missing;
        volatile int oversubscribed;

        private MonitorMetrics() {
        }
    }
}

