/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.HierarchicalConsumerAssignmentRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignment;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentAware;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentPathSerializer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.HierarchicalCache;

public class HierarchicalConsumerAssignmentCache
implements ConsumerAssignmentCache,
ClusterAssignmentCache {
    private static final int SUBSCRIPTION_LEVEL = 0;
    private static final int ASSIGNMENT_LEVEL = 1;
    private static final Logger logger = LoggerFactory.getLogger(HierarchicalConsumerAssignmentCache.class);
    private final String basePath;
    private final CuratorFramework curator;
    private final String consumerId;
    private final Set<SubscriptionAssignment> assignments = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<SubscriptionAssignmentAware> callbacks = Collections.newSetFromMap(new ConcurrentHashMap());
    private final HierarchicalCache cache;
    private final SubscriptionsCache subscriptionsCache;
    private final SubscriptionAssignmentPathSerializer pathSerializer;
    private volatile boolean started = false;

    public HierarchicalConsumerAssignmentCache(CuratorFramework curator, String consumerId, String clusterName, ZookeeperPaths zookeeperPaths, SubscriptionsCache subscriptionsCache) {
        this.curator = curator;
        this.consumerId = consumerId;
        this.basePath = zookeeperPaths.consumersRuntimePath(clusterName);
        this.subscriptionsCache = subscriptionsCache;
        this.pathSerializer = new SubscriptionAssignmentPathSerializer(this.basePath, HierarchicalConsumerAssignmentRegistry.AUTO_ASSIGNED_MARKER);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("subscription-assignment-cache-%d").build();
        this.cache = new HierarchicalCache(curator, (ExecutorService)Executors.newSingleThreadScheduledExecutor(threadFactory), this.basePath, 2, Collections.emptyList(), false);
        this.cache.registerCallback(1, e -> {
            SubscriptionAssignment assignment = this.pathSerializer.deserialize(e.getData().getPath(), e.getData().getData());
            switch (e.getType()) {
                case CHILD_ADDED: {
                    this.onAssignmentAdded(assignment);
                    break;
                }
                case CHILD_REMOVED: {
                    this.onAssignmentRemoved(assignment);
                }
            }
        });
    }

    @Override
    public void start() throws Exception {
        long startNanos = System.nanoTime();
        logger.info("Starting assignment cache for {}", (Object)this.basePath);
        List<SubscriptionAssignment> currentAssignments = this.readExistingAssignments();
        currentAssignments.forEach(this::onAssignmentAdded);
        this.cache.start();
        this.started = true;
        long elapsedMillis = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
        logger.info("Started assignment cache for {}. Read {} assignments. Took {}ms", new Object[]{this.basePath, currentAssignments.size(), elapsedMillis});
    }

    @Override
    public void stop() throws Exception {
        logger.info("Stopping assignment cache for {}", (Object)this.basePath);
        this.cache.stop();
        logger.info("Stopped assignment cache for {}", (Object)this.basePath);
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public boolean isReady() {
        return this.isStarted();
    }

    @Override
    public void refresh() {
    }

    @Override
    public SubscriptionAssignmentView createSnapshot() {
        return SubscriptionAssignmentView.of(this.assignments);
    }

    @Override
    public boolean isAssignedTo(SubscriptionName subscription) {
        return this.isAssignedTo(this.consumerId, subscription);
    }

    @Override
    public boolean isAssignedTo(String nodeId, SubscriptionName subscription) {
        return this.assignments.stream().anyMatch(a -> a.getSubscriptionName().equals((Object)subscription) && a.getConsumerNodeId().equals(nodeId));
    }

    @Override
    public void registerAssignmentCallback(SubscriptionAssignmentAware callback) {
        this.callbacks.add(callback);
    }

    private List<SubscriptionAssignment> readExistingAssignments() {
        ArrayList<SubscriptionAssignment> existingAssignments = new ArrayList<SubscriptionAssignment>();
        for (SubscriptionName subscriptionName : this.subscriptionsCache.listActiveSubscriptionNames()) {
            try {
                String path = this.pathSerializer.serialize(subscriptionName);
                List nodes = (List)this.curator.getChildren().forPath(path);
                for (String node : nodes) {
                    String fullPath = path + "/" + node;
                    existingAssignments.add(this.pathSerializer.deserialize(fullPath, (byte[])this.curator.getData().forPath(fullPath)));
                }
            }
            catch (Exception e) {
                logger.info("Exception occurred when initializing cache with subscription {}", (Object)subscriptionName, (Object)e);
            }
        }
        return existingAssignments;
    }

    private void onAssignmentAdded(SubscriptionAssignment assignment) {
        try {
            if (this.assignments.add(assignment)) {
                this.callbacks.stream().filter(callback -> this.shouldNotify((SubscriptionAssignmentAware)callback, assignment)).forEach(callback -> callback.onSubscriptionAssigned(assignment.getSubscriptionName()));
            }
        }
        catch (Exception e) {
            logger.error("Exception while adding assignment {}", (Object)assignment, (Object)e);
        }
    }

    private void onAssignmentRemoved(SubscriptionAssignment assignment) {
        try {
            if (this.assignments.remove(assignment)) {
                this.callbacks.stream().filter(callback -> this.shouldNotify((SubscriptionAssignmentAware)callback, assignment)).forEach(callback -> callback.onAssignmentRemoved(assignment.getSubscriptionName()));
            }
        }
        catch (Exception e) {
            logger.error("Exception while removing assignment {}", (Object)assignment, (Object)e);
        }
    }

    private boolean shouldNotify(SubscriptionAssignmentAware callback, SubscriptionAssignment assignment) {
        return !callback.watchedConsumerId().isPresent() || callback.watchedConsumerId().get().equals(assignment.getConsumerNodeId());
    }

    @Override
    public Map<SubscriptionName, Set<String>> getSubscriptionConsumers() {
        return this.assignments.stream().collect(Collectors.groupingBy(SubscriptionAssignment::getSubscriptionName, Collectors.mapping(SubscriptionAssignment::getConsumerNodeId, Collectors.toSet())));
    }

    @Override
    public Set<SubscriptionName> getConsumerSubscriptions() {
        return this.getConsumerSubscriptions(this.consumerId);
    }

    @Override
    public Set<SubscriptionName> getConsumerSubscriptions(String consumerId) {
        return this.assignments.stream().filter(assignment -> consumerId.equals(assignment.getConsumerNodeId())).map(SubscriptionAssignment::getSubscriptionName).collect(Collectors.toSet());
    }

    @Override
    public Set<String> getAssignedConsumers() {
        return this.assignments.stream().map(SubscriptionAssignment::getConsumerNodeId).collect(Collectors.toSet());
    }
}

