/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    private ClusterService clusterService;
    private MessagingService messagingService;
    private static final KryoSerializer SERIALIZER = new KryoSerializer(){

        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).register((Serializer)new ClusterMessageSerializer(), new Class[]{ClusterMessage.class}).register((Serializer)new MessageSubjectSerializer(), new Class[]{MessageSubject.class}).build();
        }
    };

    @Activate
    public void activate() {
        ControllerNode localNode = this.clusterService.getLocalNode();
        NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
        try {
            netty.activate();
        }
        catch (Exception e) {
            this.log.error("NettyMessagingService#activate", (Throwable)e);
        }
        this.messagingService = netty;
        this.log.info("Started on {}:{}", (Object)localNode.ip(), (Object)localNode.tcpPort());
    }

    @Deactivate
    public void deactivate() {
        try {
            ((NettyMessagingService)this.messagingService).deactivate();
        }
        catch (Exception e) {
            this.log.error("NettyMessagingService#deactivate", (Throwable)e);
        }
        this.log.info("Stopped");
    }

    public boolean broadcast(ClusterMessage message) throws IOException {
        boolean ok = true;
        ControllerNode localNode = this.clusterService.getLocalNode();
        for (ControllerNode node : this.clusterService.getNodes()) {
            if (node.equals(localNode)) continue;
            ok = this.unicastUnchecked(message, node.id()) && ok;
        }
        return ok;
    }

    public boolean broadcastIncludeSelf(ClusterMessage message) throws IOException {
        boolean ok = true;
        for (ControllerNode node : this.clusterService.getNodes()) {
            ok = this.unicastUnchecked(message, node.id()) && ok;
        }
        return ok;
    }

    public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
        boolean ok = true;
        ControllerNode localNode = this.clusterService.getLocalNode();
        for (NodeId nodeId : nodes) {
            if (nodeId.equals((Object)localNode.id())) continue;
            ok = this.unicastUnchecked(message, nodeId) && ok;
        }
        return ok;
    }

    public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object[])new Object[]{toNodeId});
        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
        try {
            this.messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode((Object)message));
            return true;
        }
        catch (IOException e) {
            this.log.trace("Failed to send cluster message to nodeId: " + toNodeId, (Throwable)e);
            throw e;
        }
    }

    private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) throws IOException {
        try {
            return this.unicast(message, toNodeId);
        }
        catch (IOException e) {
            return false;
        }
    }

    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object[])new Object[]{toNodeId});
        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
        try {
            return this.messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode((Object)message));
        }
        catch (IOException e) {
            this.log.trace("Failed interaction with remote nodeId: " + toNodeId, (Throwable)e);
            throw e;
        }
    }

    public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {
        this.messagingService.registerHandler(subject.value(), (MessageHandler)new InternalClusterMessageHandler(subscriber));
    }

    public void removeSubscriber(MessageSubject subject) {
        this.messagingService.unregisterHandler(subject.value());
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    public static final class InternalClusterMessage
    extends ClusterMessage {
        private final Message rawMessage;

        public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
            super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
            this.rawMessage = rawMessage;
        }

        public void respond(byte[] response) throws IOException {
            this.rawMessage.respond(response);
        }
    }

    private final class InternalClusterMessageHandler
    implements MessageHandler {
        private final ClusterMessageHandler handler;

        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
            this.handler = handler;
        }

        public void handle(Message message) {
            ClusterMessage clusterMessage;
            try {
                clusterMessage = (ClusterMessage)SERIALIZER.decode(message.payload());
            }
            catch (Exception e) {
                ClusterCommunicationManager.this.log.error("Failed decoding ClusterMessage", (Throwable)e);
                throw e;
            }
            try {
                this.handler.handle((ClusterMessage)new InternalClusterMessage(clusterMessage, message));
            }
            catch (Exception e) {
                ClusterCommunicationManager.this.log.error("Exception caught handling {}", (Object)clusterMessage, (Object)e);
                throw e;
            }
        }
    }
}

