/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.registry;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.consumers.registry.ConsumerNodesRegistryPaths;

public class ConsumerNodesRegistry
extends PathChildrenCache
implements PathChildrenCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerNodesRegistry.class);
    private final CuratorFramework curatorClient;
    private final ConsumerNodesRegistryPaths registryPaths;
    private final String consumerNodeId;
    private final LeaderLatch leaderLatch;
    private final Map<String, Long> consumersLastSeen = new HashMap<String, Long>();
    private final long deathOfConsumerAfterMillis;
    private final Clock clock;

    public ConsumerNodesRegistry(CuratorFramework curatorClient, ExecutorService executorService, ConsumerNodesRegistryPaths registryPaths, String consumerNodeId, int deathOfConsumerAfterSeconds, Clock clock) {
        super(curatorClient, registryPaths.nodesPath(), true, false, executorService);
        this.curatorClient = curatorClient;
        this.registryPaths = registryPaths;
        this.consumerNodeId = consumerNodeId;
        this.clock = clock;
        this.leaderLatch = new LeaderLatch(curatorClient, registryPaths.leaderPath(), consumerNodeId);
        this.deathOfConsumerAfterMillis = TimeUnit.SECONDS.toMillis(deathOfConsumerAfterSeconds);
    }

    public void start() throws Exception {
        this.getListenable().addListener((Object)this);
        super.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        this.leaderLatch.start();
    }

    public void stop() throws IOException {
        this.leaderLatch.close();
        this.close();
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        switch (event.getType()) {
            case INITIALIZED: 
            case CONNECTION_RECONNECTED: {
                if (this.isRegistered(this.consumerNodeId)) break;
                this.registerConsumerNode();
            }
        }
    }

    public boolean isRegistered(String consumerNodeId) {
        try {
            return this.curatorClient.checkExists().forPath(this.registryPaths.nodePath(consumerNodeId)) != null;
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    public boolean isLeader() {
        return this.ensureRegistered() && this.leaderLatch.hasLeadership();
    }

    public List<String> listConsumerNodes() {
        return new ArrayList<String>(this.consumersLastSeen.keySet());
    }

    public void refresh() {
        logger.info("Refreshing current consumers registry");
        long currentTime = this.clock.millis();
        this.readCurrentNodes().forEach(node -> this.consumersLastSeen.put((String)node, currentTime));
        List<String> deadConsumers = this.findDeadConsumers(currentTime);
        if (!deadConsumers.isEmpty()) {
            logger.info("Considering following consumers dead: {}", deadConsumers);
        }
        deadConsumers.forEach(this.consumersLastSeen::remove);
    }

    private boolean ensureRegistered() {
        if (this.curatorClient.getZookeeperClient().isConnected()) {
            if (!this.isRegistered(this.consumerNodeId)) {
                this.registerConsumerNode();
            }
            return true;
        }
        return false;
    }

    private void registerConsumerNode() {
        try {
            String nodePath = this.registryPaths.nodePath(this.consumerNodeId);
            if (this.curatorClient.checkExists().forPath(nodePath) == null) {
                ((ACLBackgroundPathAndBytesable)this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(nodePath);
                logger.info("Registered in consumer nodes registry as {}", (Object)this.consumerNodeId);
            }
        }
        catch (KeeperException.NodeExistsException nodePath) {
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
        this.refresh();
    }

    private List<String> findDeadConsumers(long currentTime) {
        long tooOld = currentTime - this.deathOfConsumerAfterMillis;
        return this.consumersLastSeen.entrySet().stream().filter(entry -> {
            long lastSeen = (Long)entry.getValue();
            return lastSeen < tooOld;
        }).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    private List<String> readCurrentNodes() {
        return this.getCurrentData().stream().map(data -> StringUtils.substringAfterLast((String)data.getPath(), (String)"/")).collect(Collectors.toList());
    }

    public String getConsumerId() {
        return this.consumerNodeId;
    }

    public void addLeaderLatchListener(LeaderLatchListener listener) {
        this.leaderLatch.addListener(listener);
    }

    public void removeLeaderLatchListener(LeaderLatchListener listener) {
        this.leaderLatch.removeListener(listener);
    }
}

