/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventListener;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true, enabled=false)
@Service
public class HazelcastLeadershipService
implements LeadershipService {
    private static final Logger log = LoggerFactory.getLogger(HazelcastLeadershipService.class);
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).build().populate(1);
        }
    };
    private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5000L;
    private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15000L;
    private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
    private static final long NO_TERM = 0L;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StoreService storeService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected EventDeliveryService eventDispatcher;
    private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
    private final Map<String, Topic> topics = Maps.newConcurrentMap();
    private NodeId localNodeId;
    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT = new MessageSubject("hz-leadership-events");
    private ExecutorService messageHandlingExecutor;

    @Activate
    protected void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.listenerRegistry = new AbstractListenerRegistry();
        this.eventDispatcher.addSink(LeadershipEvent.class, this.listenerRegistry);
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setGlobalOrderingEnabled(true);
        topicConfig.setName(TOPIC_HZ_ID);
        this.storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
        this.messageHandlingExecutor = Executors.newSingleThreadExecutor(Tools.groupedThreads((String)"onos/store/leadership", (String)"message-handler"));
        this.clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, (ClusterMessageHandler)new InternalLeadershipEventListener(), this.messageHandlingExecutor);
        log.info("Hazelcast Leadership Service started");
    }

    @Deactivate
    protected void deactivate() {
        this.eventDispatcher.removeSink(LeadershipEvent.class);
        this.messageHandlingExecutor.shutdown();
        this.clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
        for (Topic topic : this.topics.values()) {
            topic.stop();
        }
        this.topics.clear();
        log.info("Hazelcast Leadership Service stopped");
    }

    public NodeId getLeader(String path) {
        Topic topic = this.topics.get(path);
        if (topic == null) {
            return null;
        }
        return topic.leader();
    }

    public Leadership getLeadership(String path) {
        Preconditions.checkArgument((path != null ? 1 : 0) != 0);
        Topic topic = this.topics.get(path);
        if (topic != null) {
            return new Leadership(topic.topicName(), topic.leader(), topic.term(), 0L);
        }
        return null;
    }

    public Set<String> ownedTopics(NodeId nodeId) {
        Preconditions.checkArgument((nodeId != null ? 1 : 0) != 0);
        return this.topics.values().stream().filter(topic -> nodeId.equals((Object)((Topic)topic).leader())).map(topic -> ((Topic)topic).topicName).collect(Collectors.toSet());
    }

    public void runForLeadership(String path) {
        Preconditions.checkArgument((path != null ? 1 : 0) != 0);
        Topic topic = new Topic(path);
        Topic oldTopic = this.topics.putIfAbsent(path, topic);
        if (oldTopic == null) {
            topic.start();
            topic.runForLeadership();
        } else {
            oldTopic.runForLeadership();
        }
    }

    public void withdraw(String path) {
        Preconditions.checkArgument((path != null ? 1 : 0) != 0);
        Topic topic = this.topics.get(path);
        if (topic != null) {
            this.topics.remove(path, topic);
            topic.stop();
        }
    }

    public Map<String, Leadership> getLeaderBoard() {
        HashMap<String, Leadership> result = new HashMap<String, Leadership>();
        for (Topic topic : this.topics.values()) {
            Leadership leadership = new Leadership(topic.topicName(), topic.leader(), topic.term(), 0L);
            result.put(topic.topicName(), leadership);
        }
        return result;
    }

    public void addListener(LeadershipEventListener listener) {
        this.listenerRegistry.addListener((EventListener)listener);
    }

    public void removeListener(LeadershipEventListener listener) {
        this.listenerRegistry.removeListener((EventListener)listener);
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindStoreService(StoreService storeService) {
        this.storeService = storeService;
    }

    protected void unbindStoreService(StoreService storeService) {
        if (this.storeService == storeService) {
            this.storeService = null;
        }
    }

    protected void bindEventDispatcher(EventDeliveryService eventDeliveryService) {
        this.eventDispatcher = eventDeliveryService;
    }

    protected void unbindEventDispatcher(EventDeliveryService eventDeliveryService) {
        if (this.eventDispatcher == eventDeliveryService) {
            this.eventDispatcher = null;
        }
    }

    private class InternalLeadershipEventListener
    implements ClusterMessageHandler {
        private InternalLeadershipEventListener() {
        }

        public void handle(ClusterMessage message) {
            LeadershipEvent leadershipEvent = (LeadershipEvent)SERIALIZER.decode(message.payload());
            log.trace("Leadership Event: time = {} type = {} event = {}", new Object[]{leadershipEvent.time(), leadershipEvent.type(), leadershipEvent});
            String topicName = ((Leadership)leadershipEvent.subject()).topic();
            Topic topic = (Topic)HazelcastLeadershipService.this.topics.get(topicName);
            if (topic == null) {
                topic = new Topic(topicName);
                Topic oldTopic = HazelcastLeadershipService.this.topics.putIfAbsent(topicName, topic);
                if (oldTopic == null) {
                    topic.start();
                } else {
                    topic = oldTopic;
                }
            }
            topic.receivedLeadershipEvent(leadershipEvent);
            HazelcastLeadershipService.this.eventDispatcher.post((Event)leadershipEvent);
        }
    }

    private final class Topic {
        private final String topicName;
        private volatile boolean isShutdown = true;
        private volatile boolean isRunningForLeadership = false;
        private volatile long lastLeadershipUpdateMs = 0L;
        private ExecutorService leaderElectionExecutor;
        private volatile IAtomicLong term;
        private long myLastLeaderTerm = 0L;
        private volatile NodeId leader;
        private ILock leaderLock;
        private Future<?> getLockFuture;
        private Future<?> periodicProcessingFuture;

        private Topic(String topicName) {
            this.topicName = topicName;
        }

        private String topicName() {
            return this.topicName;
        }

        private NodeId leader() {
            return this.leader;
        }

        private long term() {
            if (this.term == null) {
                return 0L;
            }
            return this.term.get();
        }

        private synchronized void start() {
            if (!this.isShutdown) {
                return;
            }
            this.isShutdown = false;
            String threadPoolName = "election-" + this.topicName + "-%d";
            this.leaderElectionExecutor = Executors.newScheduledThreadPool(2, Tools.groupedThreads((String)"onos/leadership", (String)threadPoolName));
            this.periodicProcessingFuture = this.leaderElectionExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    Topic.this.doPeriodicProcessing();
                }
            });
        }

        private synchronized void runForLeadership() {
            if (this.isRunningForLeadership) {
                return;
            }
            if (this.isShutdown) {
                this.start();
            }
            this.isRunningForLeadership = true;
            String lockHzId = "LeadershipService/" + this.topicName + "/lock";
            String termHzId = "LeadershipService/" + this.topicName + "/term";
            this.leaderLock = HazelcastLeadershipService.this.storeService.getHazelcastInstance().getLock(lockHzId);
            this.term = HazelcastLeadershipService.this.storeService.getHazelcastInstance().getAtomicLong(termHzId);
            this.getLockFuture = this.leaderElectionExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    Topic.this.doLeaderElectionThread();
                }
            });
        }

        private synchronized void stop() {
            this.isShutdown = true;
            this.isRunningForLeadership = false;
            this.leaderElectionExecutor.shutdownNow();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
            NodeId eventLeaderId = ((Leadership)leadershipEvent.subject()).leader();
            if (!((Leadership)leadershipEvent.subject()).topic().equals(this.topicName)) {
                return;
            }
            if (eventLeaderId.equals((Object)HazelcastLeadershipService.this.localNodeId)) {
                return;
            }
            Topic topic = this;
            synchronized (topic) {
                switch ((LeadershipEvent.Type)leadershipEvent.type()) {
                    case LEADER_ELECTED: 
                    case LEADER_REELECTED: {
                        if (this.leader != null && this.leader.equals((Object)HazelcastLeadershipService.this.localNodeId)) {
                            if (this.getLockFuture == null) break;
                            this.getLockFuture.cancel(true);
                            break;
                        }
                        this.leader = ((Leadership)leadershipEvent.subject()).leader();
                        this.lastLeadershipUpdateMs = System.currentTimeMillis();
                        break;
                    }
                    case LEADER_BOOTED: {
                        if (this.leader == null || !eventLeaderId.equals((Object)this.leader)) break;
                        this.leader = null;
                        break;
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doPeriodicProcessing() {
            while (!this.isShutdown) {
                Topic topic = this;
                synchronized (topic) {
                    if (this.leader != null) {
                        LeadershipEvent leadershipEvent;
                        if (this.leader.equals((Object)HazelcastLeadershipService.this.localNodeId)) {
                            leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, new Leadership(this.topicName, HazelcastLeadershipService.this.localNodeId, this.myLastLeaderTerm, 0L));
                            HazelcastLeadershipService.this.clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(HazelcastLeadershipService.this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)leadershipEvent)));
                        } else {
                            long delta = System.currentTimeMillis() - this.lastLeadershipUpdateMs;
                            if (delta > 15000L) {
                                log.debug("Topic {} leader {} booted due to heartbeat timeout", (Object)this.topicName, (Object)this.leader);
                                leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, new Leadership(this.topicName, this.leader, this.myLastLeaderTerm, 0L));
                                HazelcastLeadershipService.this.eventDispatcher.post((Event)leadershipEvent);
                                this.leader = null;
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException e) {
                    log.debug("Leader Election periodic thread interrupted");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doLeaderElectionThread() {
            while (!this.isShutdown) {
                LeadershipEvent leadershipEvent;
                log.debug("Leader Election begin for topic {}", (Object)this.topicName);
                try {
                    this.leaderLock.lockInterruptibly();
                }
                catch (InterruptedException e) {
                    log.debug("Election interrupted for topic {}", (Object)this.topicName);
                    continue;
                }
                try {
                    Topic e = this;
                    synchronized (e) {
                        log.info("Leader Elected for topic {}", (Object)this.topicName);
                        this.updateTerm();
                        this.leader = HazelcastLeadershipService.this.localNodeId;
                        leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, new Leadership(this.topicName, HazelcastLeadershipService.this.localNodeId, this.myLastLeaderTerm, 0L));
                        HazelcastLeadershipService.this.clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(HazelcastLeadershipService.this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)leadershipEvent)));
                    }
                    Thread.sleep(Long.MAX_VALUE);
                }
                catch (InterruptedException e) {
                    log.debug("Leader Interrupted for topic {}", (Object)this.topicName);
                }
                finally {
                    Topic topic = this;
                    synchronized (topic) {
                        log.debug("Leader Lock Released for topic {}", (Object)this.topicName);
                        if (this.leader != null && this.leader.equals((Object)HazelcastLeadershipService.this.localNodeId)) {
                            this.leader = null;
                        }
                        leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, new Leadership(this.topicName, HazelcastLeadershipService.this.localNodeId, this.myLastLeaderTerm, 0L));
                        HazelcastLeadershipService.this.clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(HazelcastLeadershipService.this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)leadershipEvent)));
                        if (this.leaderLock.isLockedByCurrentThread()) {
                            this.leaderLock.unlock();
                        }
                    }
                }
            }
            this.isRunningForLeadership = false;
        }

        private void updateTerm() {
            long newTerm;
            long oldTerm = this.term.get();
            this.myLastLeaderTerm = newTerm = this.term.incrementAndGet();
            log.debug("Topic {} updated term from {} to {}", new Object[]{this.topicName, oldTerm, newTerm});
        }
    }
}

