/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import com.codahale.metrics.Timer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.registry.ConsumerNodesRegistry;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancingParameters;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancingResult;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadConstraints;
import pl.allegro.tech.hermes.domain.workload.constraints.ConsumersWorkloadConstraints;
import pl.allegro.tech.hermes.domain.workload.constraints.WorkloadConstraintsRepository;

class BalancingJob
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(BalancingJob.class);
    private final ConsumerNodesRegistry consumersRegistry;
    private final WorkBalancingParameters workBalancingParameters;
    private final SubscriptionsCache subscriptionsCache;
    private final ClusterAssignmentCache clusterAssignmentCache;
    private final ConsumerAssignmentRegistry consumerAssignmentRegistry;
    private final WorkBalancer workBalancer;
    private final HermesMetrics metrics;
    private final String kafkaCluster;
    private final WorkloadConstraintsRepository workloadConstraintsRepository;
    private final BalancingListener balancingListener;
    private final BalancingJobMetrics balancingMetrics = new BalancingJobMetrics();

    BalancingJob(ConsumerNodesRegistry consumersRegistry, WorkBalancingParameters workBalancingParameters, SubscriptionsCache subscriptionsCache, ClusterAssignmentCache clusterAssignmentCache, ConsumerAssignmentRegistry consumerAssignmentRegistry, WorkBalancer workBalancer, HermesMetrics metrics, String kafkaCluster, WorkloadConstraintsRepository workloadConstraintsRepository, BalancingListener balancingListener) {
        this.consumersRegistry = consumersRegistry;
        this.workBalancingParameters = workBalancingParameters;
        this.subscriptionsCache = subscriptionsCache;
        this.clusterAssignmentCache = clusterAssignmentCache;
        this.consumerAssignmentRegistry = consumerAssignmentRegistry;
        this.workBalancer = workBalancer;
        this.metrics = metrics;
        this.kafkaCluster = kafkaCluster;
        this.workloadConstraintsRepository = workloadConstraintsRepository;
        this.balancingListener = balancingListener;
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".all-assignments"), () -> this.balancingMetrics.allAssignments);
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".missing-resources"), () -> this.balancingMetrics.missingResources);
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".deleted-assignments"), () -> this.balancingMetrics.deletedAssignments);
        metrics.registerGauge(this.gaugeName(kafkaCluster, ".created-assignments"), () -> this.balancingMetrics.createdAssignments);
    }

    private String gaugeName(String kafkaCluster, String name) {
        return "consumers-workload." + kafkaCluster + "." + name;
    }

    @Override
    public void run() {
        block11: {
            try {
                this.consumersRegistry.refresh();
                if (this.consumersRegistry.isLeader()) {
                    try (Timer.Context ctx = this.metrics.consumersWorkloadRebalanceDurationTimer(this.kafkaCluster).time();){
                        logger.info("Initializing workload balance.");
                        this.clusterAssignmentCache.refresh();
                        SubscriptionAssignmentView initialState = this.clusterAssignmentCache.createSnapshot();
                        List<String> activeConsumers = this.consumersRegistry.listConsumerNodes();
                        List<SubscriptionName> activeSubscriptions = this.subscriptionsCache.listActiveSubscriptionNames();
                        this.balancingListener.onBeforeBalancing(activeConsumers);
                        WorkBalancingResult work = this.workBalancer.balance(activeSubscriptions, activeConsumers, initialState, this.prepareWorkloadConstraints(activeConsumers));
                        if (this.consumersRegistry.isLeader()) {
                            logger.info("Applying workload balance changes");
                            WorkDistributionChanges changes = this.calculateWorkDistributionChanges(initialState, work);
                            this.applyWorkloadChanges(changes, work);
                            logger.info("Finished workload balance");
                            this.clusterAssignmentCache.refresh();
                            this.balancingListener.onAfterBalancing(changes);
                            this.updateMetrics(work, changes);
                        } else {
                            logger.info("Lost leadership before applying changes");
                        }
                        break block11;
                    }
                }
                this.balancingMetrics.reset();
                this.balancingListener.onBalancingSkipped();
            }
            catch (Exception e) {
                logger.error("Caught exception when running balancing job", (Throwable)e);
            }
        }
    }

    private WorkloadConstraints prepareWorkloadConstraints(List<String> activeConsumers) {
        ConsumersWorkloadConstraints constraints = this.workloadConstraintsRepository.getConsumersWorkloadConstraints();
        return WorkloadConstraints.builder().withActiveConsumers(activeConsumers.size()).withConsumersPerSubscription(this.workBalancingParameters.getConsumersPerSubscription()).withMaxSubscriptionsPerConsumer(this.workBalancingParameters.getMaxSubscriptionsPerConsumer()).withSubscriptionConstraints(constraints.getSubscriptionConstraints()).withTopicConstraints(constraints.getTopicConstraints()).build();
    }

    private WorkDistributionChanges calculateWorkDistributionChanges(SubscriptionAssignmentView initialState, WorkBalancingResult workBalancingResult) {
        SubscriptionAssignmentView balancedState = workBalancingResult.getAssignmentsView();
        SubscriptionAssignmentView deletions = initialState.deletions(balancedState);
        SubscriptionAssignmentView additions = initialState.additions(balancedState);
        return new WorkDistributionChanges(deletions, additions);
    }

    private void applyWorkloadChanges(WorkDistributionChanges changes, WorkBalancingResult workBalancingResult) {
        SubscriptionAssignmentView balancedState = workBalancingResult.getAssignmentsView();
        for (String consumerId : changes.getModifiedConsumerNodes()) {
            this.consumerAssignmentRegistry.updateAssignments(consumerId, balancedState.getSubscriptionsForConsumerNode(consumerId));
        }
    }

    private void updateMetrics(WorkBalancingResult balancingResult, WorkDistributionChanges changes) {
        this.balancingMetrics.allAssignments = balancingResult.getAssignmentsView().getAllAssignments().size();
        this.balancingMetrics.missingResources = balancingResult.getMissingResources();
        this.balancingMetrics.createdAssignments = changes.getCreatedAssignmentsCount();
        this.balancingMetrics.deletedAssignments = changes.getDeletedAssignmentsCount();
    }

    private static class BalancingJobMetrics {
        volatile int allAssignments;
        volatile int missingResources;
        volatile int deletedAssignments;
        volatile int createdAssignments;

        private BalancingJobMetrics() {
        }

        void reset() {
            this.allAssignments = 0;
            this.missingResources = 0;
            this.deletedAssignments = 0;
            this.createdAssignments = 0;
        }
    }
}

