/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.packet.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class DistributedPacketStore
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;
    private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out");
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private ExecutorService messageHandlingExecutor;

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(4, Tools.groupedThreads((String)"onos/store/packet", (String)"message-handlers"));
        this.communicationService.addSubscriber(PACKET_OUT_SUBJECT, (ClusterMessageHandler)new InternalClusterMessageHandler(), this.messageHandlingExecutor);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.log.info("Stopped");
    }

    public void emit(OutboundPacket packet) {
        NodeId myId = this.clusterService.getLocalNode().id();
        NodeId master = this.mastershipService.getMasterFor(packet.sendThrough());
        if (master == null) {
            return;
        }
        if (myId.equals((Object)master)) {
            this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet));
            return;
        }
        this.communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT, SERIALIZER.encode((Object)packet)), master);
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        this.communicationService = clusterCommunicationService;
    }

    protected void unbindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        if (this.communicationService == clusterCommunicationService) {
            this.communicationService = null;
        }
    }

    private class InternalClusterMessageHandler
    implements ClusterMessageHandler {
        private InternalClusterMessageHandler() {
        }

        public void handle(ClusterMessage message) {
            if (!message.subject().equals((Object)PACKET_OUT_SUBJECT)) {
                DistributedPacketStore.this.log.warn("Received message with wrong subject: {}", (Object)message);
            }
            OutboundPacket packet = (OutboundPacket)SERIALIZER.decode(message.payload());
            DistributedPacketStore.this.notifyDelegate((Event)new PacketEvent(PacketEvent.Type.EMIT, packet));
        }
    }
}

