/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MeteringAgent subjectMeteringAgent = new MeteringAgent("clusterCommunication", "subject", true);
    private final MeteringAgent endpointMeteringAgent = new MeteringAgent("clusterCommunication", "endpoint", true);
    private static final String PRIMITIVE_NAME = "clusterCommunication";
    private static final String SUBJECT_PREFIX = "subject";
    private static final String ENDPOINT_PREFIX = "endpoint";
    private static final String SERIALIZING = "serialization";
    private static final String DESERIALIZING = "deserialization";
    private static final String NODE_PREFIX = "node:";
    private static final String ROUND_TRIP_SUFFIX = ".rtt";
    private static final String ONE_WAY_SUFFIX = ".oneway";
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected MessagingService messagingService;
    private NodeId localNodeId;

    @Activate
    public void activate() {
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped");
    }

    public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.multicast(message, subject, encoder, this.clusterService.getNodes().stream().filter(node -> !Objects.equal((Object)node, (Object)this.clusterService.getLocalNode())).map(ControllerNode::id).collect(Collectors.toSet()));
    }

    public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.multicast(message, subject, encoder, this.clusterService.getNodes().stream().map(ControllerNode::id).collect(Collectors.toSet()));
    }

    public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        try {
            byte[] payload = new ClusterMessage(this.localNodeId, subject, this.timeFunction(encoder, this.subjectMeteringAgent, SERIALIZING).apply(message)).getBytes();
            return this.doUnicast(subject, payload, toNodeId);
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodes) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        byte[] payload = new ClusterMessage(this.localNodeId, subject, this.timeFunction(encoder, this.subjectMeteringAgent, SERIALIZING).apply(message)).getBytes();
        nodes.forEach(nodeId -> this.doUnicast(subject, payload, (NodeId)nodeId));
    }

    public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject, Function<M, byte[]> encoder, Function<byte[], R> decoder, NodeId toNodeId) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        try {
            ClusterMessage envelope = new ClusterMessage(this.clusterService.getLocalNode().id(), subject, this.timeFunction(encoder, this.subjectMeteringAgent, SERIALIZING).apply(message));
            return this.sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(bytes -> this.timeFunction(decoder, this.subjectMeteringAgent, DESERIALIZING).apply(bytes));
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object)toNodeId);
        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
        MeteringAgent.Context context = this.subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
        return this.messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
    }

    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
        ControllerNode node = this.clusterService.getNode(toNodeId);
        Preconditions.checkArgument((node != null ? 1 : 0) != 0, (String)"Unknown nodeId: %s", (Object)toNodeId);
        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
        MeteringAgent.Context epContext = this.endpointMeteringAgent.startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
        MeteringAgent.Context subjectContext = this.subjectMeteringAgent.startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
        return this.messagingService.sendAndReceive(nodeEp, subject.toString(), payload).whenComplete((bytes, throwable) -> {
            subjectContext.stop(throwable);
            epContext.stop(throwable);
        });
    }

    public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(subject.toString(), (BiFunction)new InternalClusterMessageHandler(subscriber), (Executor)executor);
    }

    public void removeSubscriber(MessageSubject subject) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.unregisterHandler(subject.toString());
    }

    public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(subject.toString(), new InternalMessageResponder<Object, R>(decoder, encoder, m -> {
            CompletableFuture responseFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    responseFuture.complete(handler.apply(m));
                }
                catch (Exception e) {
                    responseFuture.completeExceptionally(e);
                }
            });
            return responseFuture;
        }));
    }

    public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(subject.toString(), new InternalMessageResponder<M, R>(decoder, encoder, handler));
    }

    public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder, Consumer<M> handler, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.messagingService.registerHandler(subject.toString(), new InternalMessageConsumer<M>(decoder, handler), executor);
    }

    private <A, B> Function<A, B> timeFunction(final Function<A, B> timedFunction, final MeteringAgent meter, final String opName) {
        Preconditions.checkNotNull(timedFunction);
        Preconditions.checkNotNull((Object)meter);
        Preconditions.checkNotNull((Object)opName);
        return new Function<A, B>(){

            @Override
            public B apply(A a) {
                MeteringAgent.Context context = meter.startTimer(opName);
                Object result = null;
                try {
                    result = timedFunction.apply(a);
                    context.stop(null);
                    return result;
                }
                catch (Exception e) {
                    context.stop((Throwable)e);
                    Throwables.throwIfUnchecked((Throwable)Throwables.getRootCause((Throwable)e));
                    throw new IllegalStateException(e.getCause());
                }
            }
        };
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindMessagingService(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    protected void unbindMessagingService(MessagingService messagingService) {
        if (this.messagingService == messagingService) {
            this.messagingService = null;
        }
    }

    private class InternalMessageConsumer<M>
    implements BiConsumer<Endpoint, byte[]> {
        private final Function<byte[], M> decoder;
        private final Consumer<M> consumer;

        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
            this.decoder = decoder;
            this.consumer = consumer;
        }

        @Override
        public void accept(Endpoint sender, byte[] bytes) {
            this.consumer.accept(ClusterCommunicationManager.this.timeFunction(this.decoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.DESERIALIZING).apply(ClusterMessage.fromBytes((byte[])bytes).payload()));
        }
    }

    private class InternalMessageResponder<M, R>
    implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
        private final Function<byte[], M> decoder;
        private final Function<R, byte[]> encoder;
        private final Function<M, CompletableFuture<R>> handler;

        public InternalMessageResponder(Function<byte[], M> decoder, Function<R, byte[]> encoder, Function<M, CompletableFuture<R>> handler) {
            this.decoder = decoder;
            this.encoder = encoder;
            this.handler = handler;
        }

        @Override
        public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
            return this.handler.apply(ClusterCommunicationManager.this.timeFunction(this.decoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.DESERIALIZING).apply(ClusterMessage.fromBytes((byte[])bytes).payload())).thenApply(m -> (byte[])ClusterCommunicationManager.this.timeFunction(this.encoder, ClusterCommunicationManager.this.subjectMeteringAgent, ClusterCommunicationManager.SERIALIZING).apply(m));
        }
    }

    private class InternalClusterMessageHandler
    implements BiFunction<Endpoint, byte[], byte[]> {
        private ClusterMessageHandler handler;

        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
            this.handler = handler;
        }

        @Override
        public byte[] apply(Endpoint sender, byte[] bytes) {
            ClusterMessage message = ClusterMessage.fromBytes((byte[])bytes);
            this.handler.handle(message);
            return message.response();
        }
    }
}

