/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.consistent.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventListener;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true, enabled=true)
@Service
public class DistributedLeadershipManager
implements LeadershipService {
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected EventDeliveryService eventDispatcher;
    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT = new MessageSubject("distributed-leadership-manager-events");
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private ExecutorService messageHandlingExecutor;
    private ScheduledExecutorService retryLeaderLockExecutor;
    private ScheduledExecutorService deadLockDetectionExecutor;
    private ScheduledExecutorService leadershipStatusBroadcaster;
    private ConsistentMap<String, NodeId> lockMap;
    private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
    private NodeId localNodeId;
    private Set<String> activeTopics = Sets.newConcurrentHashSet();
    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
    private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
    private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).build().populate(1);
        }
    };

    @Activate
    public void activate() {
        this.lockMap = this.storageService.createConsistentMap("onos-leader-locks", new Serializer(){
            KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();

            public <T> byte[] encode(T object) {
                return this.kryo.serialize(object);
            }

            public <T> T decode(byte[] bytes) {
                return (T)this.kryo.deserialize(bytes);
            }
        });
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.messageHandlingExecutor = Executors.newSingleThreadExecutor(Tools.groupedThreads((String)"onos/store/leadership", (String)"message-handler"));
        this.retryLeaderLockExecutor = Executors.newScheduledThreadPool(4, Tools.groupedThreads((String)"onos/store/leadership", (String)"election-thread-%d"));
        this.deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/store/leadership", (String)"dead-lock-detector"));
        this.leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"onos/store/leadership", (String)"peer-updater"));
        this.clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, (ClusterMessageHandler)new InternalLeadershipEventListener(), this.messageHandlingExecutor);
        this.deadLockDetectionExecutor.scheduleWithFixedDelay(this::purgeStaleLocks, 0L, 2L, TimeUnit.SECONDS);
        this.leadershipStatusBroadcaster.scheduleWithFixedDelay(this::sendLeadershipStatus, 0L, 2L, TimeUnit.SECONDS);
        this.listenerRegistry = new AbstractListenerRegistry();
        this.eventDispatcher.addSink(LeadershipEvent.class, this.listenerRegistry);
        this.log.info("Started.");
    }

    @Deactivate
    public void deactivate() {
        this.leaderBoard.forEach((topic, leadership) -> {
            if (this.localNodeId.equals((Object)leadership.leader())) {
                this.withdraw((String)topic);
            }
        });
        this.eventDispatcher.removeSink(LeadershipEvent.class);
        this.clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.retryLeaderLockExecutor.shutdown();
        this.deadLockDetectionExecutor.shutdown();
        this.leadershipStatusBroadcaster.shutdown();
        this.log.info("Stopped.");
    }

    public Map<String, Leadership> getLeaderBoard() {
        return ImmutableMap.copyOf(this.leaderBoard);
    }

    public NodeId getLeader(String path) {
        Leadership leadership = this.leaderBoard.get(path);
        return leadership != null ? leadership.leader() : null;
    }

    public Leadership getLeadership(String path) {
        Preconditions.checkArgument((path != null ? 1 : 0) != 0);
        return this.leaderBoard.get(path);
    }

    public Set<String> ownedTopics(NodeId nodeId) {
        Preconditions.checkArgument((nodeId != null ? 1 : 0) != 0);
        return this.leaderBoard.entrySet().stream().filter(entry -> nodeId.equals((Object)((Leadership)entry.getValue()).leader())).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    public void runForLeadership(String path) {
        this.log.debug("Running for leadership for topic: {}", (Object)path);
        this.activeTopics.add(path);
        this.tryLeaderLock(path);
    }

    public void withdraw(String path) {
        this.activeTopics.remove(path);
        try {
            Versioned leader = this.lockMap.get((Object)path);
            if (Objects.equals(leader.value(), this.localNodeId) && this.lockMap.remove((Object)path, leader.version())) {
                this.log.info("Gave up leadership for {}", (Object)path);
                this.notifyRemovedLeader(path, this.localNodeId, leader.version(), leader.creationTime());
            }
        }
        catch (Exception e) {
            this.log.debug("Failed to verify (and clear) any lock this node might be holding for {}", (Object)path, (Object)e);
        }
    }

    public void addListener(LeadershipEventListener listener) {
        this.listenerRegistry.addListener((EventListener)listener);
    }

    public void removeListener(LeadershipEventListener listener) {
        this.listenerRegistry.removeListener((EventListener)listener);
    }

    private void tryLeaderLock(String path) {
        if (!this.activeTopics.contains(path)) {
            return;
        }
        try {
            Versioned currentLeader = this.lockMap.get((Object)path);
            if (currentLeader != null) {
                if (this.localNodeId.equals(currentLeader.value())) {
                    this.log.info("Already has leadership for {}", (Object)path);
                    this.notifyNewLeader(path, this.localNodeId, currentLeader.version(), currentLeader.creationTime());
                } else {
                    this.retry(path);
                }
            } else if (this.lockMap.putIfAbsent((Object)path, (Object)this.localNodeId) == null) {
                this.log.info("Assumed leadership for {}", (Object)path);
                Versioned newLeader = this.lockMap.get((Object)path);
                this.notifyNewLeader(path, this.localNodeId, newLeader.version(), newLeader.creationTime());
            } else {
                this.retry(path);
            }
        }
        catch (Exception e) {
            this.log.debug("Attempt to acquire leadership lock for topic {} failed", (Object)path, (Object)e);
            this.retry(path);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
        Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
        boolean updatedLeader = false;
        Map<String, Leadership> map = this.leaderBoard;
        synchronized (map) {
            Leadership currentLeader = this.leaderBoard.get(path);
            if (currentLeader == null || currentLeader.epoch() < epoch) {
                this.leaderBoard.put(path, newLeadership);
                updatedLeader = true;
            }
        }
        if (updatedLeader) {
            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
            this.eventDispatcher.post((Event)event);
            this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)event)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
        Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
        boolean updatedLeader = false;
        Map<String, Leadership> map = this.leaderBoard;
        synchronized (map) {
            Leadership currentLeader = this.leaderBoard.get(path);
            if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
                this.leaderBoard.remove(path);
                updatedLeader = true;
            }
        }
        if (updatedLeader) {
            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
            this.eventDispatcher.post((Event)event);
            this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)event)));
        }
    }

    private void retry(String path) {
        this.retryLeaderLockExecutor.schedule(() -> this.tryLeaderLock(path), 2L, TimeUnit.SECONDS);
    }

    private void purgeStaleLocks() {
        try {
            Set entries = this.lockMap.entrySet();
            entries.forEach(entry -> {
                String path = (String)entry.getKey();
                NodeId nodeId = (NodeId)((Versioned)entry.getValue()).value();
                long epoch = ((Versioned)entry.getValue()).version();
                long creationTime = ((Versioned)entry.getValue()).creationTime();
                if (this.clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
                    this.log.info("Lock for {} is held by {} which is currently inactive", (Object)path, (Object)nodeId);
                    try {
                        if (this.lockMap.remove((Object)path, epoch)) {
                            this.log.info("Purged stale lock held by {} for {}", (Object)nodeId, (Object)path);
                            this.notifyRemovedLeader(path, nodeId, epoch, creationTime);
                        }
                    }
                    catch (Exception e) {
                        this.log.warn("Failed to purge stale lock held by {} for {}", new Object[]{nodeId, path, e});
                    }
                }
                if (this.localNodeId.equals((Object)nodeId) && !this.activeTopics.contains(path)) {
                    this.log.debug("Lock for {} is held by {} when it not running for leadership.", (Object)path, (Object)nodeId);
                    try {
                        if (this.lockMap.remove((Object)path, epoch)) {
                            this.log.info("Purged stale lock held by {} for {}", (Object)nodeId, (Object)path);
                            this.notifyRemovedLeader(path, nodeId, epoch, creationTime);
                        }
                    }
                    catch (Exception e) {
                        this.log.warn("Failed to purge stale lock held by {} for {}", new Object[]{nodeId, path, e});
                    }
                }
            });
        }
        catch (Exception e) {
            this.log.debug("Failed cleaning up stale locks", (Throwable)e);
        }
    }

    private void sendLeadershipStatus() {
        try {
            this.leaderBoard.forEach((path, leadership) -> {
                if (leadership.leader().equals((Object)this.localNodeId)) {
                    LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
                    this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode((Object)event)));
                }
            });
        }
        catch (Exception e) {
            this.log.debug("Failed to send leadership updates", (Throwable)e);
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindEventDispatcher(EventDeliveryService eventDeliveryService) {
        this.eventDispatcher = eventDeliveryService;
    }

    protected void unbindEventDispatcher(EventDeliveryService eventDeliveryService) {
        if (this.eventDispatcher == eventDeliveryService) {
            this.eventDispatcher = null;
        }
    }

    private class InternalLeadershipEventListener
    implements ClusterMessageHandler {
        private InternalLeadershipEventListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle(ClusterMessage message) {
            LeadershipEvent leadershipEvent = (LeadershipEvent)SERIALIZER.decode(message.payload());
            DistributedLeadershipManager.this.log.debug("Leadership Event: time = {} type = {} event = {}", new Object[]{leadershipEvent.time(), leadershipEvent.type(), leadershipEvent});
            Leadership leadershipUpdate = (Leadership)leadershipEvent.subject();
            LeadershipEvent.Type eventType = (LeadershipEvent.Type)leadershipEvent.type();
            String topic = leadershipUpdate.topic();
            boolean updateAccepted = false;
            Map map = DistributedLeadershipManager.this.leaderBoard;
            synchronized (map) {
                Leadership currentLeadership = (Leadership)DistributedLeadershipManager.this.leaderBoard.get(topic);
                if (eventType.equals((Object)LeadershipEvent.Type.LEADER_ELECTED)) {
                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
                        DistributedLeadershipManager.this.leaderBoard.put(topic, leadershipUpdate);
                        updateAccepted = true;
                    }
                } else if (eventType.equals((Object)LeadershipEvent.Type.LEADER_BOOTED)) {
                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
                        DistributedLeadershipManager.this.leaderBoard.remove(topic);
                        updateAccepted = true;
                    }
                } else {
                    throw new IllegalStateException("Unknown event type.");
                }
                if (updateAccepted) {
                    DistributedLeadershipManager.this.eventDispatcher.post((Event)leadershipEvent);
                }
            }
        }
    }
}

