/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventListener;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MutexExecutionService;
import org.onosproject.store.service.MutexTask;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class MutexExecutionManager
implements MutexExecutionService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    protected ConsistentMap<String, MutexState> lockMap;
    protected NodeId localNodeId;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
    private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
    private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
    private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();

    @Activate
    public void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.lockMap = (ConsistentMap)((ConsistentMapBuilder)((ConsistentMapBuilder)((ConsistentMapBuilder)this.storageService.consistentMapBuilder().withName("onos-mutexes")).withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), (Class[])new Class[]{MutexState.class}))).withPartitionsDisabled()).build();
        this.lockMap.addListener(this.mapEventListener);
        this.clusterService.addListener((EventListener)this.clusterEventListener);
        this.releaseOldLocks();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.lockMap.removeListener(this.mapEventListener);
        this.pending.values().forEach(future -> future.cancel(true));
        this.activeTasks.forEach((k, v) -> {
            v.stop();
            this.unlock((String)k);
        });
        this.clusterService.removeListener((EventListener)this.clusterEventListener);
        this.log.info("Stopped");
    }

    public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.MUTEX_WRITE);
        return ((CompletableFuture)((CompletableFuture)this.lock(exclusionPath).thenApply(state -> this.activeTasks.computeIfAbsent(exclusionPath, k -> new InnerMutexTask(exclusionPath, task, state.term())))).thenAcceptAsync(t -> t.start(), executor)).whenComplete((r, e) -> this.unlock(exclusionPath));
    }

    protected CompletableFuture<MutexState> lock(String exclusionPath) {
        CompletableFuture future = this.pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture());
        this.tryLock(exclusionPath);
        return future;
    }

    protected void tryLock(String exclusionPath) {
        Tools.retryable(() -> this.lockMap.asJavaMap().compute(exclusionPath, (k, v) -> MutexState.admit(v, this.localNodeId)), ConsistentMapException.ConcurrentModification.class, (int)Integer.MAX_VALUE, (int)100).get();
    }

    protected void unlock(String exclusionPath) {
        Tools.retryable(() -> this.lockMap.asJavaMap().compute(exclusionPath, (k, v) -> MutexState.evict(v, this.localNodeId)), ConsistentMapException.ConcurrentModification.class, (int)Integer.MAX_VALUE, (int)100).get();
    }

    private void releaseOldLocks() {
        Maps.filterValues((Map)this.lockMap.asJavaMap(), state -> this.localNodeId.equals((Object)state.holder())).keySet().forEach(path -> {
            this.log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
            this.unlock((String)path);
        });
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private class InnerMutexTask
    implements MutexTask {
        private final MutexTask task;
        private final String mutexPath;
        private final long term;

        public InnerMutexTask(String mutexPath, MutexTask task, long term) {
            this.mutexPath = mutexPath;
            this.term = term;
            this.task = task;
        }

        public long term() {
            return this.term;
        }

        public void start() {
            MutexExecutionManager.this.log.debug("Starting execution for mutex task guarded by {}", (Object)this.mutexPath);
            this.task.start();
            MutexExecutionManager.this.log.debug("Finished execution for mutex task guarded by {}", (Object)this.mutexPath);
        }

        public void stop() {
            MutexExecutionManager.this.log.debug("Stopping execution for mutex task guarded by {}", (Object)this.mutexPath);
            this.task.stop();
        }
    }

    private static final class MutexState {
        private final NodeId holder;
        private final List<NodeId> waitList;
        private final long term;

        public static MutexState admit(MutexState state, NodeId nodeId) {
            if (state == null) {
                return new MutexState(nodeId, 1L, Lists.newArrayList());
            }
            if (state.holder() == null) {
                return new MutexState(nodeId, state.term() + 1L, Lists.newArrayList());
            }
            if (!state.contains(nodeId)) {
                NodeId newHolder = state.holder();
                ArrayList newWaitList = Lists.newArrayList(state.waitList());
                newWaitList.add(nodeId);
                return new MutexState(newHolder, state.term(), newWaitList);
            }
            return state;
        }

        public static MutexState evict(MutexState state, NodeId nodeId) {
            return state.evict(nodeId);
        }

        public MutexState evict(NodeId nodeId) {
            if (nodeId.equals((Object)this.holder)) {
                if (this.waitList.isEmpty()) {
                    return new MutexState(null, this.term, this.waitList);
                }
                ArrayList newWaitList = Lists.newArrayList(this.waitList);
                NodeId newHolder = (NodeId)newWaitList.remove(0);
                return new MutexState(newHolder, this.term + 1L, newWaitList);
            }
            NodeId newHolder = this.holder;
            ArrayList newWaitList = Lists.newArrayList(this.waitList);
            newWaitList.remove(nodeId);
            return new MutexState(newHolder, this.term, newWaitList);
        }

        public NodeId holder() {
            return this.holder;
        }

        public List<NodeId> waitList() {
            return this.waitList;
        }

        public long term() {
            return this.term;
        }

        private boolean contains(NodeId nodeId) {
            return nodeId.equals((Object)this.holder) || this.waitList.contains(nodeId);
        }

        private MutexState(NodeId holder, long term, List<NodeId> waitList) {
            this.holder = holder;
            this.term = term;
            this.waitList = Lists.newArrayList(waitList);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("holder", (Object)this.holder).add("term", this.term).add("waitList", this.waitList).toString();
        }
    }

    private class InternalClusterEventListener
    implements ClusterEventListener {
        private InternalClusterEventListener() {
        }

        public void event(ClusterEvent event) {
            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
                NodeId nodeId = ((ControllerNode)event.subject()).id();
                MutexExecutionManager.this.log.debug("{} is no longer active. Attemping to clean up its locks.", (Object)nodeId);
                MutexExecutionManager.this.lockMap.asJavaMap().forEach((k, v) -> {
                    if (((MutexState)v).contains(nodeId)) {
                        MutexExecutionManager.this.lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
                    }
                });
            }
            long activeNodes = MutexExecutionManager.this.clusterService.getNodes().stream().map(node -> MutexExecutionManager.this.clusterService.getState(node.id())).filter(arg_0 -> ControllerNode.State.ACTIVE.equals(arg_0)).count();
            if (MutexExecutionManager.this.clusterService.getNodes().size() > 1 && activeNodes == 1L) {
                MutexExecutionManager.this.log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
                MutexExecutionManager.this.activeTasks.forEach((k, v) -> v.stop());
            }
        }
    }

    private class InternalLockMapEventListener
    implements MapEventListener<String, MutexState> {
        private InternalLockMapEventListener() {
        }

        public void event(MapEvent<String, MutexState> event) {
            MutexExecutionManager.this.log.debug("Received {}", event);
            if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
                MutexExecutionManager.this.pending.computeIfPresent(event.key(), (k, future) -> {
                    MutexState state = (MutexState)Versioned.valueOrElse((Versioned)event.value(), null);
                    if (state != null && MutexExecutionManager.this.localNodeId.equals((Object)state.holder())) {
                        MutexExecutionManager.this.log.debug("Local node is now owner for {}", event.key());
                        future.complete(state);
                        return null;
                    }
                    return future;
                });
                InnerMutexTask task = (InnerMutexTask)MutexExecutionManager.this.activeTasks.get(event.key());
                if (task != null && task.term() < ((MutexState)Versioned.valueOrElse((Versioned)event.value(), null)).term()) {
                    task.stop();
                }
            }
        }
    }
}

