/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.selective;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.admin.AdminOperationsCallback;
import pl.allegro.tech.hermes.common.admin.zookeeper.ZookeeperAdminCache;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SupervisorController;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkTracker;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.BalancingJob;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.ConsumerNodesRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.SelectiveWorkBalancer;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.SubscriptionCallback;
import pl.allegro.tech.hermes.domain.notifications.TopicCallback;

public class SelectiveSupervisorController
implements SupervisorController {
    private static final Logger logger = LoggerFactory.getLogger(SelectiveSupervisorController.class);
    private final ConsumersSupervisor supervisor;
    private final InternalNotificationsBus notificationsBus;
    private final SubscriptionsCache subscriptionsCache;
    private final SubscriptionAssignmentRegistry registry;
    private final WorkTracker workTracker;
    private final ConsumerNodesRegistry consumersRegistry;
    private final ZookeeperAdminCache adminCache;
    private final ConfigFactory configFactory;
    private final HermesMetrics metrics;
    private final ExecutorService assignmentExecutor;

    public SelectiveSupervisorController(ConsumersSupervisor supervisor, InternalNotificationsBus notificationsBus, SubscriptionsCache subscriptionsCache, SubscriptionAssignmentRegistry registry, WorkTracker workTracker, ConsumerNodesRegistry consumersRegistry, ZookeeperAdminCache adminCache, ExecutorService assignmentExecutor, ConfigFactory configFactory, HermesMetrics metrics) {
        this.supervisor = supervisor;
        this.notificationsBus = notificationsBus;
        this.subscriptionsCache = subscriptionsCache;
        this.registry = registry;
        this.workTracker = workTracker;
        this.consumersRegistry = consumersRegistry;
        this.adminCache = adminCache;
        this.assignmentExecutor = assignmentExecutor;
        this.configFactory = configFactory;
        this.metrics = metrics;
    }

    @Override
    public void onSubscriptionAssigned(Subscription subscription) {
        logger.info("Scheduling assignment consumer for {}", (Object)subscription.getQualifiedName());
        this.assignmentExecutor.execute(() -> {
            logger.info("Assigning consumer for {}", (Object)subscription.getQualifiedName());
            this.supervisor.assignConsumerForSubscription(subscription);
            logger.info("Consumer assigned for {}", (Object)subscription.getQualifiedName());
        });
    }

    @Override
    public void onAssignmentRemoved(SubscriptionName subscription) {
        logger.info("Scheduling assignment removal consumer for {}", (Object)subscription.getQualifiedName());
        this.assignmentExecutor.execute(() -> {
            logger.info("Removing assignment from consumer for {}", (Object)subscription.getQualifiedName());
            this.supervisor.deleteConsumerForSubscriptionName(subscription);
            logger.info("Consumer removed for {}", (Object)subscription.getName());
        });
    }

    public void onSubscriptionChanged(Subscription subscription) {
        if (this.workTracker.isAssignedTo(subscription.getQualifiedName(), this.getId())) {
            logger.info("Updating subscription {}", (Object)subscription.getName());
            this.supervisor.updateSubscription(subscription);
        }
    }

    public void onTopicChanged(Topic topic) {
        for (Subscription subscription : this.subscriptionsCache.subscriptionsOfTopic(topic.getName())) {
            if (!this.workTracker.isAssignedTo(subscription.getQualifiedName(), this.getId())) continue;
            this.supervisor.updateTopic(subscription, topic);
        }
    }

    @Override
    public void start() throws Exception {
        this.adminCache.start();
        this.adminCache.addCallback((AdminOperationsCallback)this);
        this.notificationsBus.registerSubscriptionCallback((SubscriptionCallback)this);
        this.notificationsBus.registerTopicCallback((TopicCallback)this);
        this.registry.registerAssignementCallback(this);
        this.supervisor.start();
        this.consumersRegistry.start();
        if (this.configFactory.getBooleanProperty(Configs.CONSUMER_WORKLOAD_AUTO_REBALANCE)) {
            this.consumersRegistry.registerLeaderLatchListener(new BalancingJob(this.consumersRegistry, this.subscriptionsCache, new SelectiveWorkBalancer(this.configFactory.getIntProperty(Configs.CONSUMER_WORKLOAD_CONSUMERS_PER_SUBSCRIPTION), this.configFactory.getIntProperty(Configs.CONSUMER_WORKLOAD_MAX_SUBSCRIPTIONS_PER_CONSUMER)), this.workTracker, this.metrics, this.configFactory.getIntProperty(Configs.CONSUMER_WORKLOAD_REBALANCE_INTERVAL), this.configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME)));
        } else {
            logger.info("Automatic workload rebalancing is disabled.");
        }
        logger.info("Consumer boot complete. Workload config: [{}]", (Object)this.configFactory.print(new Configs[]{Configs.CONSUMER_WORKLOAD_NODE_ID, Configs.CONSUMER_WORKLOAD_ALGORITHM, Configs.CONSUMER_WORKLOAD_REBALANCE_INTERVAL, Configs.CONSUMER_WORKLOAD_CONSUMERS_PER_SUBSCRIPTION, Configs.CONSUMER_WORKLOAD_MAX_SUBSCRIPTIONS_PER_CONSUMER}));
        this.registry.start();
    }

    @Override
    public Set<SubscriptionName> assignedSubscriptions() {
        return this.registry.createSnapshot().getSubscriptionsForConsumerNode(this.getId());
    }

    @Override
    public void shutdown() throws InterruptedException {
        this.supervisor.shutdown();
    }

    public String getId() {
        return this.consumersRegistry.getId();
    }

    public boolean isLeader() {
        return this.consumersRegistry.isLeader();
    }

    public void onRetransmissionStarts(SubscriptionName subscription) throws Exception {
        logger.info("Triggering retransmission for subscription {}", (Object)subscription);
        if (this.workTracker.isAssignedTo(subscription, this.getId())) {
            this.supervisor.retransmit(subscription);
        }
    }

    public void restartConsumer(SubscriptionName subscription) throws Exception {
        if (this.workTracker.isAssignedTo(subscription, this.getId())) {
            this.supervisor.restartConsumer(subscription);
        }
    }
}

