/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.packet.impl;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class DistributedPacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    private PacketRequestTracker tracker;
    private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out");
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private ExecutorService messageHandlingExecutor;

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(4, Tools.groupedThreads((String)"onos/store/packet", (String)"message-handlers"));
        this.communicationService.addSubscriber(PACKET_OUT_SUBJECT, (ClusterMessageHandler)new InternalClusterMessageHandler(), this.messageHandlingExecutor);
        this.tracker = new PacketRequestTracker();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.log.info("Stopped");
    }

    public void emit(OutboundPacket packet) {
        NodeId myId = this.clusterService.getLocalNode().id();
        NodeId master = this.mastershipService.getMasterFor(packet.sendThrough());
        if (master == null) {
            return;
        }
        if (myId.equals((Object)master)) {
            this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet));
            return;
        }
        this.communicationService.unicast((Object)packet, PACKET_OUT_SUBJECT, arg_0 -> ((KryoSerializer)SERIALIZER).encode(arg_0), master);
    }

    public boolean requestPackets(PacketRequest request) {
        return this.tracker.add(request);
    }

    public Set<PacketRequest> existingRequests() {
        return this.tracker.requests();
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        this.communicationService = clusterCommunicationService;
    }

    protected void unbindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        if (this.communicationService == clusterCommunicationService) {
            this.communicationService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    private class PacketRequestTracker {
        private ConsistentMap<PacketRequest, Boolean> requests;

        public PacketRequestTracker() {
            this.requests = DistributedPacketStore.this.storageService.consistentMapBuilder().withName("packet-requests").withPartitionsDisabled().withSerializer(Serializer.using((KryoNamespace)new KryoNamespace.Builder().register(KryoNamespaces.API).build())).withSerializer(new Serializer(){
                KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();

                public <T> byte[] encode(T object) {
                    return this.kryo.serialize(object);
                }

                public <T> T decode(byte[] bytes) {
                    return (T)this.kryo.deserialize(bytes);
                }
            }).build();
        }

        public boolean add(PacketRequest request) {
            return this.requests.putIfAbsent((Object)request, (Object)true) == null;
        }

        public boolean remove(PacketRequest request) {
            return this.requests.remove((Object)request) != null;
        }

        public Set<PacketRequest> requests() {
            return this.requests.keySet();
        }
    }

    private class InternalClusterMessageHandler
    implements ClusterMessageHandler {
        private InternalClusterMessageHandler() {
        }

        public void handle(ClusterMessage message) {
            if (!message.subject().equals((Object)PACKET_OUT_SUBJECT)) {
                DistributedPacketStore.this.log.warn("Received message with wrong subject: {}", (Object)message);
            }
            OutboundPacket packet = (OutboundPacket)SERIALIZER.decode(message.payload());
            DistributedPacketStore.this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet));
        }
    }
}

