/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.mirror;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.admin.AdminOperationsCallback;
import pl.allegro.tech.hermes.common.admin.zookeeper.ZookeeperAdminCache;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.ConsumersSupervisor;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignment;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SupervisorController;
import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus;
import pl.allegro.tech.hermes.domain.notifications.SubscriptionCallback;
import pl.allegro.tech.hermes.domain.notifications.TopicCallback;

public class MirroringSupervisorController
implements SupervisorController {
    private static final Logger logger = LoggerFactory.getLogger(MirroringSupervisorController.class);
    private final String consumerNodeId;
    private final ConsumersSupervisor supervisor;
    private final InternalNotificationsBus notificationsBus;
    private final ConsumerAssignmentCache assignmentCache;
    private final ConsumerAssignmentRegistry consumerAssignmentRegistry;
    private final SubscriptionsCache subscriptionsCache;
    private final ZookeeperAdminCache adminCache;
    private final ConfigFactory configFactory;
    private final ExecutorService executorService;

    public MirroringSupervisorController(ConsumersSupervisor supervisor, InternalNotificationsBus notificationsBus, ConsumerAssignmentCache assignmentCache, ConsumerAssignmentRegistry consumerAssignmentRegistry, SubscriptionsCache subscriptionsCache, ZookeeperAdminCache adminCache, ConfigFactory configFactory) {
        this.consumerNodeId = configFactory.getStringProperty(Configs.CONSUMER_WORKLOAD_NODE_ID);
        this.supervisor = supervisor;
        this.notificationsBus = notificationsBus;
        this.assignmentCache = assignmentCache;
        this.consumerAssignmentRegistry = consumerAssignmentRegistry;
        this.subscriptionsCache = subscriptionsCache;
        this.adminCache = adminCache;
        this.configFactory = configFactory;
        this.executorService = Executors.newFixedThreadPool(configFactory.getIntProperty(Configs.ZOOKEEPER_TASK_PROCESSING_THREAD_POOL_SIZE), new ThreadFactoryBuilder().setNameFormat("mirroring-supervisor-%d").build());
    }

    public void onSubscriptionCreated(Subscription subscription) {
        Set<SubscriptionAssignment> currentAssignments = this.getConsumerAssignments();
        SubscriptionAssignmentView currentState = SubscriptionAssignmentView.of(currentAssignments);
        SubscriptionAssignment addedAssignment = new SubscriptionAssignment(this.consumerNodeId, subscription.getQualifiedName());
        SubscriptionAssignmentView targetState = SubscriptionAssignmentView.of((Set<SubscriptionAssignment>)Sets.union(currentAssignments, Collections.singleton(addedAssignment)));
        this.executorService.submit(() -> this.consumerAssignmentRegistry.updateAssignments(currentState, targetState));
    }

    public void onSubscriptionRemoved(Subscription subscription) {
        Set<SubscriptionAssignment> currentAssignments = this.getConsumerAssignments();
        SubscriptionAssignmentView currentState = SubscriptionAssignmentView.of(currentAssignments);
        SubscriptionAssignment deletedAssignment = new SubscriptionAssignment(this.consumerNodeId, subscription.getQualifiedName());
        SubscriptionAssignmentView targetState = SubscriptionAssignmentView.of((Set<SubscriptionAssignment>)Sets.difference(currentAssignments, Collections.singleton(deletedAssignment)));
        this.executorService.submit(() -> this.consumerAssignmentRegistry.updateAssignments(currentState, targetState));
    }

    private Set<SubscriptionAssignment> getConsumerAssignments() {
        return this.assignmentCache.getConsumerSubscriptions().stream().map(s -> new SubscriptionAssignment(this.consumerNodeId, (SubscriptionName)s)).collect(Collectors.toSet());
    }

    public void onSubscriptionChanged(Subscription subscription) {
        this.executorService.submit(() -> {
            switch (subscription.getState()) {
                case PENDING: 
                case ACTIVE: {
                    this.onSubscriptionCreated(subscription);
                    break;
                }
                case SUSPENDED: {
                    this.onSubscriptionRemoved(subscription);
                    break;
                }
            }
            this.supervisor.updateSubscription(subscription);
        });
    }

    public void onTopicChanged(Topic topic) {
        for (Subscription subscription : this.subscriptionsCache.subscriptionsOfTopic(topic.getName())) {
            this.executorService.submit(() -> this.supervisor.updateTopic(subscription, topic));
        }
    }

    @Override
    public void onSubscriptionAssigned(SubscriptionName subscriptionName) {
        logger.info("Assigning consumer for {}", (Object)subscriptionName.getQualifiedName());
        Subscription subscription = this.subscriptionsCache.getSubscription(subscriptionName);
        this.supervisor.assignConsumerForSubscription(subscription);
    }

    @Override
    public void onAssignmentRemoved(SubscriptionName subscription) {
        logger.info("Removing assignment from consumer for {}", (Object)subscription);
        this.supervisor.deleteConsumerForSubscriptionName(subscription);
    }

    @Override
    public void start() throws Exception {
        this.adminCache.start();
        this.adminCache.addCallback((AdminOperationsCallback)this);
        this.notificationsBus.registerSubscriptionCallback((SubscriptionCallback)this);
        this.notificationsBus.registerTopicCallback((TopicCallback)this);
        this.assignmentCache.registerAssignmentCallback(this);
        this.supervisor.start();
        logger.info("Consumer boot complete. Workload config: [{}]", (Object)this.configFactory.print(new Configs[]{Configs.CONSUMER_WORKLOAD_NODE_ID, Configs.CONSUMER_WORKLOAD_ALGORITHM}));
    }

    @Override
    public Set<SubscriptionName> assignedSubscriptions() {
        return this.assignmentCache.getConsumerSubscriptions();
    }

    @Override
    public void shutdown() throws InterruptedException {
        this.supervisor.shutdown();
    }

    public void onRetransmissionStarts(SubscriptionName subscription) throws Exception {
        this.supervisor.retransmit(subscription);
    }

    @Override
    public Optional<String> watchedConsumerId() {
        return Optional.of(this.consumerNodeId);
    }
}

