/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.selective;

import com.codahale.metrics.Timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkTracker;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.ConsumerNodesRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.SelectiveWorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.WorkBalancingResult;

public class BalancingJob
implements LeaderLatchListener,
Runnable {
    private final ConsumerNodesRegistry consumersRegistry;
    private final SubscriptionsCache subscriptionsCache;
    private final SelectiveWorkBalancer workBalancer;
    private final WorkTracker workTracker;
    private final HermesMetrics metrics;
    private final String kafkaCluster;
    private final ScheduledExecutorService executorService;
    private final int intervalSeconds;
    private ScheduledFuture job;
    private static final Logger logger = LoggerFactory.getLogger(BalancingJob.class);

    public BalancingJob(ConsumerNodesRegistry consumersRegistry, SubscriptionsCache subscriptionsCache, SelectiveWorkBalancer workBalancer, WorkTracker workTracker, HermesMetrics metrics, int intervalSeconds, String kafkaCluster) {
        this.consumersRegistry = consumersRegistry;
        this.subscriptionsCache = subscriptionsCache;
        this.workBalancer = workBalancer;
        this.workTracker = workTracker;
        this.metrics = metrics;
        this.kafkaCluster = kafkaCluster;
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        this.intervalSeconds = intervalSeconds;
    }

    @Override
    public void run() {
        if (this.consumersRegistry.isLeader()) {
            try (Timer.Context ctx = this.metrics.consumersWorkloadRebalanceDurationTimer(this.kafkaCluster).time();){
                logger.info("Initializing workload balance.");
                WorkBalancingResult work = this.workBalancer.balance(this.subscriptionsCache.listActiveSubscriptionNames(), this.consumersRegistry.list(), this.workTracker.getAssignments());
                WorkTracker.WorkDistributionChanges changes = this.workTracker.apply(work.getAssignmentsView());
                logger.info("Finished workload balance {}, {}", (Object)work.toString(), (Object)changes.toString());
                this.metrics.reportConsumersWorkloadStats(this.kafkaCluster, work.getMissingResources(), changes.getDeletedAssignmentsCount(), changes.getCreatedAssignmentsCount());
            }
        }
    }

    public void isLeader() {
        this.job = this.executorService.scheduleAtFixedRate(this, this.intervalSeconds, this.intervalSeconds, TimeUnit.SECONDS);
    }

    public void notLeader() {
        this.job.cancel(false);
    }
}

