/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.subscription.id.SubscriptionIds;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerWorkloadDecoder;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentAware;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadRegistryPaths;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

public class ConsumerAssignmentCache
implements NodeCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerAssignmentCache.class);
    private final WorkloadRegistryPaths paths;
    private final String consumerId;
    private final NodeCache workloadNodeCache;
    private final ConsumerWorkloadDecoder consumerWorkloadDecoder;
    private final Set<SubscriptionName> currentlyAssignedSubscriptions = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<SubscriptionAssignmentAware> callbacks = Collections.newSetFromMap(new ConcurrentHashMap());

    public ConsumerAssignmentCache(CuratorFramework curator, String consumerId, String clusterName, ZookeeperPaths zookeeperPaths, SubscriptionIds subscriptionIds) {
        this.paths = new WorkloadRegistryPaths(zookeeperPaths, clusterName);
        this.consumerId = consumerId;
        String path = this.paths.consumerWorkloadPath(consumerId);
        this.workloadNodeCache = new NodeCache(curator, path);
        this.workloadNodeCache.getListenable().addListener((Object)this);
        this.consumerWorkloadDecoder = new ConsumerWorkloadDecoder(subscriptionIds);
    }

    public void start() throws Exception {
        try {
            logger.info("Starting binary workload assignment cache at {}, watching current consumer path at {}", (Object)this.paths.consumersWorkloadCurrentClusterRuntimeBinaryPath(), (Object)this.paths.consumerWorkloadPath(this.consumerId));
            this.workloadNodeCache.start(true);
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not start node cache for consumer workload", e);
        }
        this.refreshConsumerWorkload();
    }

    private void refreshConsumerWorkload() {
        ChildData nodeData = this.workloadNodeCache.getCurrentData();
        if (nodeData != null) {
            byte[] data = nodeData.getData();
            Set<SubscriptionName> subscriptions = this.consumerWorkloadDecoder.decode(data);
            this.updateAssignedSubscriptions(subscriptions);
        } else {
            logger.info("No workload data available for consumer");
        }
    }

    private void updateAssignedSubscriptions(Set<SubscriptionName> targetAssignments) {
        ImmutableSet assignmentDeletions = Sets.difference(this.currentlyAssignedSubscriptions, targetAssignments).immutableCopy();
        ImmutableSet assignmentsAdditions = Sets.difference(targetAssignments, this.currentlyAssignedSubscriptions).immutableCopy();
        assignmentDeletions.forEach(s -> logger.info("Assignment deletion for subscription {}", (Object)s.getQualifiedName()));
        assignmentsAdditions.forEach(s -> logger.info("Assignment addition for subscription {}", (Object)s.getQualifiedName()));
        this.currentlyAssignedSubscriptions.clear();
        this.currentlyAssignedSubscriptions.addAll(targetAssignments);
        this.callbacks.forEach(callback -> {
            block3: {
                block2: {
                    if (!callback.watchedConsumerId().isPresent()) break block2;
                    if (!callback.watchedConsumerId().get().equals(this.consumerId)) break block3;
                }
                assignmentDeletions.forEach(callback::onAssignmentRemoved);
                assignmentsAdditions.forEach(callback::onSubscriptionAssigned);
            }
        });
    }

    public void stop() throws Exception {
        try {
            logger.info("Stopping binary workload assignment cache");
            this.workloadNodeCache.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Could not stop node cache for consumer workload", e);
        }
    }

    public boolean isAssignedTo(SubscriptionName subscription) {
        return this.currentlyAssignedSubscriptions.contains(subscription);
    }

    public void registerAssignmentCallback(SubscriptionAssignmentAware callback) {
        this.callbacks.add(callback);
    }

    public Set<SubscriptionName> getConsumerSubscriptions() {
        return ImmutableSet.copyOf(this.currentlyAssignedSubscriptions);
    }

    public void nodeChanged() {
        this.refreshConsumerWorkload();
    }
}

