/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentPathSerializer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentRegistry;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

public class SubscriptionAssignmentCaches
extends PathChildrenCache
implements PathChildrenCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionAssignmentCaches.class);
    private final CuratorFramework curator;
    private final String localCluster;
    private final ZookeeperPaths zookeeperPaths;
    private final SubscriptionsCache subscriptionsCache;
    private Map<String, SubscriptionAssignmentCache> caches = new ConcurrentHashMap<String, SubscriptionAssignmentCache>();

    @Inject
    public SubscriptionAssignmentCaches(CuratorFramework curator, ConfigFactory configFactory, ZookeeperPaths zookeeperPaths, SubscriptionsCache subscriptionsCache) {
        super(curator, zookeeperPaths.consumersWorkloadPath(), true, false, SubscriptionAssignmentCaches.createExecutor());
        this.curator = curator;
        this.localCluster = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
        this.zookeeperPaths = zookeeperPaths;
        this.subscriptionsCache = subscriptionsCache;
        this.createCacheIfNeeded(this.localCluster);
    }

    private static ExecutorService createExecutor() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cluster-cache-%d").build());
    }

    public SubscriptionAssignmentCache localClusterCache() {
        return this.caches.get(this.localCluster);
    }

    public List<SubscriptionAssignmentCache> all() {
        return new ArrayList<SubscriptionAssignmentCache>(this.caches.values());
    }

    public void start() throws Exception {
        this.getListenable().addListener((Object)this);
        this.localClusterCache().start();
        super.start();
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if (event.getData() == null) {
            return;
        }
        String path = event.getData().getPath();
        logger.info("Got {} event for path {}", (Object)event.getType(), (Object)path);
        try {
            String cluster = StringUtils.substringAfterLast((String)path, (String)"/");
            switch (event.getType()) {
                case CHILD_ADDED: {
                    this.createCacheIfNeeded(cluster).ifPresent(this::startCache);
                    break;
                }
                case CHILD_REMOVED: {
                    this.removeCache(cluster).ifPresent(this::stopCache);
                }
            }
        }
        catch (Exception e) {
            logger.error("Failed to process cluster update for event: {}", (Object)event);
        }
    }

    @PreDestroy
    public void stop() throws Exception {
        for (SubscriptionAssignmentCache cache : this.caches.values()) {
            cache.stop();
        }
    }

    public boolean isStarted() {
        return this.localClusterCache().isStarted();
    }

    private Optional<SubscriptionAssignmentCache> createCacheIfNeeded(String cluster) {
        if (!this.caches.containsKey(cluster)) {
            SubscriptionAssignmentCache cache = this.createCache(cluster);
            this.caches.put(cluster, cache);
            return Optional.of(cache);
        }
        return Optional.empty();
    }

    private SubscriptionAssignmentCache createCache(String cluster) {
        String runtimePath = this.zookeeperPaths.consumersRuntimePath(cluster);
        return new SubscriptionAssignmentCache(this.curator, runtimePath, this.subscriptionsCache, new SubscriptionAssignmentPathSerializer(runtimePath, SubscriptionAssignmentRegistry.AUTO_ASSIGNED_MARKER));
    }

    private Optional<SubscriptionAssignmentCache> removeCache(String cluster) throws Exception {
        return Optional.ofNullable(this.caches.remove(cluster));
    }

    private void startCache(SubscriptionAssignmentCache cache) {
        try {
            cache.start();
        }
        catch (Exception e) {
            logger.error("Failed to start cache", (Throwable)e);
        }
    }

    private void stopCache(SubscriptionAssignmentCache cache) {
        try {
            cache.stop();
        }
        catch (Exception e) {
            logger.error("Failed to stop cache", (Throwable)e);
        }
    }
}

