/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.packet.impl;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class DistributedPacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MAX_BACKOFF = 50;
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    private PacketRequestTracker tracker;
    private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out");
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private ExecutorService messageHandlingExecutor;

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(4, Tools.groupedThreads((String)"onos/store/packet", (String)"message-handlers"));
        this.communicationService.addSubscriber(PACKET_OUT_SUBJECT, arg_0 -> ((KryoSerializer)SERIALIZER).decode(arg_0), packet -> this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet)), (Executor)this.messageHandlingExecutor);
        this.tracker = new PacketRequestTracker();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.tracker = null;
        this.log.info("Stopped");
    }

    public void emit(OutboundPacket packet) {
        NodeId myId = this.clusterService.getLocalNode().id();
        NodeId master = this.mastershipService.getMasterFor(packet.sendThrough());
        if (master == null) {
            return;
        }
        if (myId.equals((Object)master)) {
            this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet));
            return;
        }
        this.communicationService.unicast((Object)packet, PACKET_OUT_SUBJECT, arg_0 -> ((KryoSerializer)SERIALIZER).encode(arg_0), master).whenComplete((r, error) -> {
            if (error != null) {
                this.log.warn("Failed to send packet-out to {}", (Object)master, error);
            }
        });
    }

    public void requestPackets(PacketRequest request) {
        this.tracker.add(request);
    }

    public void cancelPackets(PacketRequest request) {
        this.tracker.remove(request);
    }

    public List<PacketRequest> existingRequests() {
        return this.tracker.requests();
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        this.communicationService = clusterCommunicationService;
    }

    protected void unbindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        if (this.communicationService == clusterCommunicationService) {
            this.communicationService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private final class PacketRequestTracker {
        private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;

        private PacketRequestTracker() {
            this.requests = DistributedPacketStore.this.storageService.consistentMapBuilder().withName("onos-packet-requests").withPartitionsDisabled().withSerializer(Serializer.using((KryoNamespace)KryoNamespaces.API)).build();
        }

        private void add(PacketRequest request) {
            AtomicBoolean firstRequest = (AtomicBoolean)Tools.retryable(this::addInternal, ConsistentMapException.ConcurrentModification.class, (int)Integer.MAX_VALUE, (int)50).apply(request);
            if (firstRequest.get() && DistributedPacketStore.this.delegate != null) {
                ((PacketStoreDelegate)DistributedPacketStore.this.delegate).requestPackets(request);
            }
        }

        private AtomicBoolean addInternal(PacketRequest request) {
            AtomicBoolean firstRequest = new AtomicBoolean(false);
            this.requests.compute((Object)request.selector(), (s, existingRequests) -> {
                if (existingRequests == null) {
                    firstRequest.set(true);
                    return ImmutableSet.of((Object)request);
                }
                if (!existingRequests.contains(request)) {
                    return ImmutableSet.builder().addAll((Iterable)existingRequests).add((Object)request).build();
                }
                return existingRequests;
            });
            return firstRequest;
        }

        private void remove(PacketRequest request) {
            AtomicBoolean removedLast = (AtomicBoolean)Tools.retryable(this::removeInternal, ConsistentMapException.ConcurrentModification.class, (int)Integer.MAX_VALUE, (int)50).apply(request);
            if (removedLast.get() && DistributedPacketStore.this.delegate != null) {
                ((PacketStoreDelegate)DistributedPacketStore.this.delegate).cancelPackets(request);
            }
        }

        private AtomicBoolean removeInternal(PacketRequest request) {
            AtomicBoolean removedLast = new AtomicBoolean(false);
            this.requests.computeIfPresent((Object)request.selector(), (s, existingRequests) -> {
                if (existingRequests.contains(request)) {
                    HashSet newRequests = Sets.newHashSet((Iterable)existingRequests);
                    newRequests.remove(request);
                    if (newRequests.size() > 0) {
                        return ImmutableSet.copyOf((Collection)newRequests);
                    }
                    removedLast.set(true);
                    return null;
                }
                return existingRequests;
            });
            return removedLast;
        }

        private List<PacketRequest> requests() {
            ArrayList list = Lists.newArrayList();
            this.requests.values().forEach(v -> list.addAll((Collection)v.value()));
            list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
            return list;
        }
    }
}

