/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.selective;

import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkTracker;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.ConsumerNodesRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.SelectiveWorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.WorkBalancingResult;

public class BalancingJob
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(BalancingJob.class);
    private final ConsumerNodesRegistry consumersRegistry;
    private final SubscriptionsCache subscriptionsCache;
    private final SelectiveWorkBalancer workBalancer;
    private final WorkTracker workTracker;
    private final HermesMetrics metrics;
    private final String kafkaCluster;
    private final ScheduledExecutorService executorService;
    private final int intervalSeconds;
    private ScheduledFuture job;
    private final BalancingJobMetrics balancingMetrics = new BalancingJobMetrics();

    public BalancingJob(ConsumerNodesRegistry consumersRegistry, SubscriptionsCache subscriptionsCache, SelectiveWorkBalancer workBalancer, WorkTracker workTracker, HermesMetrics metrics, int intervalSeconds, String kafkaCluster) {
        this.consumersRegistry = consumersRegistry;
        this.subscriptionsCache = subscriptionsCache;
        this.workBalancer = workBalancer;
        this.workTracker = workTracker;
        this.metrics = metrics;
        this.kafkaCluster = kafkaCluster;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("BalancingExecutor-%d").build();
        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.intervalSeconds = intervalSeconds;
        metrics.registerGauge(this.gaugeName(kafkaCluster, "selective.all-assignments"), () -> this.balancingMetrics.allAssignments);
        metrics.registerGauge(this.gaugeName(kafkaCluster, "selective.missing-resources"), () -> this.balancingMetrics.missingResources);
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".selective.deleted-assignments"), () -> this.balancingMetrics.deletedAssignments);
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".selective.created-assignments"), () -> this.balancingMetrics.createdAssignments);
    }

    private String gaugeName(String kafkaCluster, String name) {
        return "consumers-workload." + kafkaCluster + "." + name;
    }

    @Override
    public void run() {
        block17: {
            try {
                this.consumersRegistry.refresh();
                if (this.consumersRegistry.isLeader() && this.workTracker.isReady()) {
                    try (Timer.Context ctx = this.metrics.consumersWorkloadRebalanceDurationTimer(this.kafkaCluster).time();){
                        logger.info("Initializing workload balance.");
                        SubscriptionAssignmentView initialState = this.workTracker.getAssignments();
                        WorkBalancingResult work = this.workBalancer.balance(this.subscriptionsCache.listActiveSubscriptionNames(), this.consumersRegistry.list(), initialState);
                        if (this.consumersRegistry.isLeader()) {
                            logger.info("Applying workload balance changes {}", (Object)work.toString());
                            WorkTracker.WorkDistributionChanges changes = this.workTracker.apply(initialState, work.getAssignmentsView());
                            logger.info("Finished workload balance {}, {}", (Object)work.toString(), (Object)changes.toString());
                            this.updateMetrics(work, changes);
                        } else {
                            logger.info("Lost leadership before applying changes");
                        }
                        break block17;
                    }
                }
                this.balancingMetrics.reset();
            }
            catch (Exception e) {
                logger.error("Caught exception when running balancing job", (Throwable)e);
            }
        }
    }

    public void start() {
        this.job = this.executorService.scheduleAtFixedRate(this, this.intervalSeconds, this.intervalSeconds, TimeUnit.SECONDS);
    }

    public void stop() throws InterruptedException {
        this.job.cancel(false);
        this.executorService.shutdown();
        this.executorService.awaitTermination(1L, TimeUnit.MINUTES);
    }

    private void updateMetrics(WorkBalancingResult balancingResult, WorkTracker.WorkDistributionChanges changes) {
        this.balancingMetrics.allAssignments = balancingResult.getAssignmentsView().getAllAssignments().size();
        this.balancingMetrics.missingResources = balancingResult.getMissingResources();
        this.balancingMetrics.createdAssignments = changes.getCreatedAssignmentsCount();
        this.balancingMetrics.deletedAssignments = changes.getDeletedAssignmentsCount();
    }

    private static class BalancingJobMetrics {
        volatile int allAssignments;
        volatile int missingResources;
        volatile int deletedAssignments;
        volatile int createdAssignments;

        private BalancingJobMetrics() {
        }

        void reset() {
            this.allAssignments = 0;
            this.missingResources = 0;
            this.deletedAssignments = 0;
            this.createdAssignments = 0;
        }
    }
}

