/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.admin.AdminOperationsCallback;
import pl.allegro.tech.hermes.common.admin.zookeeper.ZookeeperAdminCache;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.registry.ConsumerNodesRegistry;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingJob;
import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentAware;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancingParameters;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.SubscriptionCallback;
import pl.allegro.tech.hermes.domain.notifications.TopicCallback;
import pl.allegro.tech.hermes.domain.workload.constraints.WorkloadConstraintsRepository;

public class WorkloadSupervisor
implements SubscriptionCallback,
TopicCallback,
SubscriptionAssignmentAware,
AdminOperationsCallback {
    private static final Logger logger = LoggerFactory.getLogger(WorkloadSupervisor.class);
    private final ConsumersSupervisor supervisor;
    private final InternalNotificationsBus notificationsBus;
    private final SubscriptionsCache subscriptionsCache;
    private final ConsumerAssignmentCache assignmentCache;
    private final ConsumerNodesRegistry consumersRegistry;
    private final BalancingJob balancingJob;
    private final ZookeeperAdminCache adminCache;
    private final WorkBalancingParameters workBalancingParameters;
    private final ExecutorService assignmentExecutor;
    private final ScheduledExecutorService rebalanceScheduler;

    public WorkloadSupervisor(ConsumersSupervisor supervisor, InternalNotificationsBus notificationsBus, SubscriptionsCache subscriptionsCache, ConsumerAssignmentCache assignmentCache, ConsumerAssignmentRegistry consumerAssignmentRegistry, ClusterAssignmentCache clusterAssignmentCache, ConsumerNodesRegistry consumersRegistry, ZookeeperAdminCache adminCache, ExecutorService assignmentExecutor, WorkBalancingParameters workBalancingParameters, String kafkaClusterName, MetricsFacade metrics, WorkloadConstraintsRepository workloadConstraintsRepository, WorkBalancer workBalancer, BalancingListener balancingListener) {
        this.supervisor = supervisor;
        this.notificationsBus = notificationsBus;
        this.subscriptionsCache = subscriptionsCache;
        this.assignmentCache = assignmentCache;
        this.consumersRegistry = consumersRegistry;
        this.adminCache = adminCache;
        this.assignmentExecutor = assignmentExecutor;
        this.workBalancingParameters = workBalancingParameters;
        this.balancingJob = new BalancingJob(consumersRegistry, workBalancingParameters, subscriptionsCache, clusterAssignmentCache, consumerAssignmentRegistry, workBalancer, metrics, kafkaClusterName, workloadConstraintsRepository, balancingListener);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("balancing-executor-%d").build();
        this.rebalanceScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    @Override
    public void onSubscriptionAssigned(SubscriptionName subscriptionName) {
        Subscription subscription = this.subscriptionsCache.getSubscription(subscriptionName);
        logger.info("Scheduling assignment consumer for {}", (Object)subscription.getQualifiedName());
        this.assignmentExecutor.execute(() -> {
            logger.info("Assigning consumer for {}", (Object)subscription.getQualifiedName());
            this.supervisor.assignConsumerForSubscription(subscription);
            logger.info("Consumer assigned for {}", (Object)subscription.getQualifiedName());
        });
    }

    @Override
    public void onAssignmentRemoved(SubscriptionName subscription) {
        logger.info("Scheduling assignment removal consumer for {}", (Object)subscription.getQualifiedName());
        this.assignmentExecutor.execute(() -> {
            logger.info("Removing assignment from consumer for {}", (Object)subscription.getQualifiedName());
            this.supervisor.deleteConsumerForSubscriptionName(subscription);
            logger.info("Consumer removed for {}", (Object)subscription.getName());
        });
    }

    public void onSubscriptionChanged(Subscription subscription) {
        if (this.assignmentCache.isAssignedTo(subscription.getQualifiedName())) {
            logger.info("Updating subscription {}", (Object)subscription.getName());
            this.supervisor.updateSubscription(subscription);
        }
    }

    public void onTopicChanged(Topic topic) {
        for (Subscription subscription : this.subscriptionsCache.subscriptionsOfTopic(topic.getName())) {
            if (!this.assignmentCache.isAssignedTo(subscription.getQualifiedName())) continue;
            this.supervisor.updateTopic(subscription, topic);
        }
    }

    public void start() throws Exception {
        long startTime = System.currentTimeMillis();
        this.adminCache.start();
        this.adminCache.addCallback((AdminOperationsCallback)this);
        this.notificationsBus.registerSubscriptionCallback((SubscriptionCallback)this);
        this.notificationsBus.registerTopicCallback((TopicCallback)this);
        this.assignmentCache.registerAssignmentCallback(this);
        this.supervisor.start();
        if (this.workBalancingParameters.isAutoRebalance()) {
            this.rebalanceScheduler.scheduleWithFixedDelay(this.balancingJob, this.workBalancingParameters.getRebalanceInterval().toMillis(), this.workBalancingParameters.getRebalanceInterval().toMillis(), TimeUnit.MILLISECONDS);
        } else {
            logger.info("Automatic workload rebalancing is disabled.");
        }
        logger.info("Consumer boot complete in {} ms.", (Object)(System.currentTimeMillis() - startTime));
    }

    public Set<SubscriptionName> assignedSubscriptions() {
        return this.assignmentCache.getConsumerSubscriptions();
    }

    public void shutdown() throws Exception {
        this.rebalanceScheduler.shutdown();
        this.rebalanceScheduler.awaitTermination(1L, TimeUnit.MINUTES);
        this.supervisor.shutdown();
    }

    @Override
    public Optional<String> watchedConsumerId() {
        return Optional.of(this.consumersRegistry.getConsumerId());
    }

    public String consumerId() {
        return this.consumersRegistry.getConsumerId();
    }

    public boolean isLeader() {
        return this.consumersRegistry.isLeader();
    }

    public void onRetransmissionStarts(SubscriptionName subscription) throws Exception {
        if (this.assignmentCache.isAssignedTo(subscription)) {
            logger.info("Triggering retransmission for subscription {}", (Object)subscription);
            this.supervisor.retransmit(subscription);
        }
    }
}

