/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    private ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MessagingService messagingService;
    private NodeId localNodeId;

    @Activate
    public void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped");
    }

    public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
        this.multicast(message, subject, encoder, this.clusterService.getNodes().stream().filter(node -> !Objects.equal((Object)node, (Object)this.clusterService.getLocalNode())).map(ControllerNode::id).collect(Collectors.toSet()));
    }

    public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
        this.multicast(message, subject, encoder, this.clusterService.getNodes().stream().map(ControllerNode::id).collect(Collectors.toSet()));
    }

    public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
        try {
            byte[] payload = new ClusterMessage(this.localNodeId, subject, encoder.apply(message)).getBytes();
            return this.doUnicast(subject, payload, toNodeId);
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodes) {
        byte[] payload = new ClusterMessage(this.localNodeId, subject, encoder.apply(message)).getBytes();
        nodes.forEach(nodeId -> this.doUnicast(subject, payload, (NodeId)nodeId));
    }

    public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject, Function<M, byte[]> encoder, Function<byte[], R> decoder, NodeId toNodeId) {
        try {
            ClusterMessage envelope = new ClusterMessage(this.clusterService.getLocalNode().id(), subject, encoder.apply(message));
            return this.sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object[])new Object[]{toNodeId});
        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
        return this.messagingService.sendAsync(nodeEp, subject.value(), payload);
    }

    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object[])new Object[]{toNodeId});
        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
        return this.messagingService.sendAndReceive(nodeEp, subject.value(), payload);
    }

    public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
        this.messagingService.registerHandler(subject.value(), (Function)new InternalClusterMessageHandler(subscriber), (Executor)executor);
    }

    public void removeSubscriber(MessageSubject subject) {
        this.messagingService.unregisterHandler(subject.value());
    }

    public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
        this.messagingService.registerHandler(subject.value(), new InternalMessageResponder<Object, R>(decoder, encoder, m -> {
            CompletableFuture responseFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    responseFuture.complete(handler.apply(m));
                }
                catch (Exception e) {
                    responseFuture.completeExceptionally(e);
                }
            });
            return responseFuture;
        }));
    }

    public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
        this.messagingService.registerHandler(subject.value(), new InternalMessageResponder<M, R>(decoder, encoder, handler));
    }

    public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Consumer<M> handler, Executor executor) {
        this.messagingService.registerHandler(subject.value(), new InternalMessageConsumer<M>(decoder, handler), executor);
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindMessagingService(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    protected void unbindMessagingService(MessagingService messagingService) {
        if (this.messagingService == messagingService) {
            this.messagingService = null;
        }
    }

    private class InternalMessageConsumer<M>
    implements Consumer<byte[]> {
        private final Function<byte[], M> decoder;
        private final Consumer<M> consumer;

        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
            this.decoder = decoder;
            this.consumer = consumer;
        }

        @Override
        public void accept(byte[] bytes) {
            this.consumer.accept(this.decoder.apply(ClusterMessage.fromBytes((byte[])bytes).payload()));
        }
    }

    private class InternalMessageResponder<M, R>
    implements Function<byte[], CompletableFuture<byte[]>> {
        private final Function<byte[], M> decoder;
        private final Function<R, byte[]> encoder;
        private final Function<M, CompletableFuture<R>> handler;

        public InternalMessageResponder(Function<byte[], M> decoder, Function<R, byte[]> encoder, Function<M, CompletableFuture<R>> handler) {
            this.decoder = decoder;
            this.encoder = encoder;
            this.handler = handler;
        }

        @Override
        public CompletableFuture<byte[]> apply(byte[] bytes) {
            return this.handler.apply(this.decoder.apply(ClusterMessage.fromBytes((byte[])bytes).payload())).thenApply(this.encoder);
        }
    }

    private class InternalClusterMessageHandler
    implements Function<byte[], byte[]> {
        private ClusterMessageHandler handler;

        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
            this.handler = handler;
        }

        @Override
        public byte[] apply(byte[] bytes) {
            ClusterMessage message = ClusterMessage.fromBytes((byte[])bytes);
            this.handler.handle(message);
            return message.response();
        }
    }
}

