/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.impl.NettyMessagingManager;

public class NettyMessagingManagerTest {
    HybridLogicalClockService testClockService = new HybridLogicalClockService(){
        AtomicLong counter = new AtomicLong();

        public HybridLogicalTime timeNow() {
            return new HybridLogicalTime(this.counter.incrementAndGet(), 0L);
        }

        public void recordEventTime(HybridLogicalTime time) {
        }
    };
    NettyMessagingManager netty1;
    NettyMessagingManager netty2;
    private static final String DUMMY_NAME = "node";
    private static final String IP_STRING = "127.0.0.1";
    Endpoint ep1 = new Endpoint(IpAddress.valueOf((String)"127.0.0.1"), 5001);
    Endpoint ep2 = new Endpoint(IpAddress.valueOf((String)"127.0.0.1"), 5002);
    Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf((String)"127.0.0.1"), 5003);

    @Before
    public void setUp() throws Exception {
        this.ep1 = new Endpoint(IpAddress.valueOf((String)IP_STRING), TestTools.findAvailablePort((int)5001));
        this.netty1 = new NettyMessagingManager();
        this.netty1.clusterMetadataService = this.dummyMetadataService(DUMMY_NAME, IP_STRING, this.ep1);
        this.netty1.clockService = this.testClockService;
        this.netty1.activate();
        this.ep2 = new Endpoint(IpAddress.valueOf((String)IP_STRING), TestTools.findAvailablePort((int)5003));
        this.netty2 = new NettyMessagingManager();
        this.netty2.clusterMetadataService = this.dummyMetadataService(DUMMY_NAME, IP_STRING, this.ep2);
        this.netty2.clockService = this.testClockService;
        this.netty2.activate();
    }

    private String nextSubject() {
        return UUID.randomUUID().toString();
    }

    @After
    public void tearDown() throws Exception {
        if (this.netty1 != null) {
            this.netty1.deactivate();
        }
        if (this.netty2 != null) {
            this.netty2.deactivate();
        }
    }

    @Test
    public void testSendAsync() {
        String subject = this.nextSubject();
        CountDownLatch latch1 = new CountDownLatch(1);
        CompletableFuture response = this.netty1.sendAsync(this.ep2, subject, "hello world".getBytes());
        response.whenComplete((r, e) -> {
            Assert.assertNull((Object)e);
            latch1.countDown();
        });
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch1);
        CountDownLatch latch2 = new CountDownLatch(1);
        response = this.netty1.sendAsync(this.invalidEndPoint, subject, "hello world".getBytes());
        response.whenComplete((r, e) -> {
            Assert.assertNotNull((Object)e);
            latch2.countDown();
        });
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)latch2);
    }

    @Test
    public void testSendAndReceive() {
        String subject = this.nextSubject();
        AtomicBoolean handlerInvoked = new AtomicBoolean(false);
        AtomicReference request = new AtomicReference();
        AtomicReference sender = new AtomicReference();
        BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
            handlerInvoked.set(true);
            sender.set(ep);
            request.set(data);
            return "hello there".getBytes();
        };
        this.netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
        CompletableFuture response = this.netty1.sendAndReceive(this.ep2, subject, "hello world".getBytes());
        Assert.assertTrue((boolean)Arrays.equals("hello there".getBytes(), (byte[])response.join()));
        Assert.assertTrue((boolean)handlerInvoked.get());
        Assert.assertTrue((boolean)Arrays.equals((byte[])request.get(), "hello world".getBytes()));
        Assert.assertEquals((Object)this.ep1, sender.get());
    }

    @Test
    @Ignore
    public void testSendAndReceiveWithExecutor() {
        String subject = this.nextSubject();
        ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
        ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
        AtomicReference handlerThreadName = new AtomicReference();
        AtomicReference completionThreadName = new AtomicReference();
        CountDownLatch latch = new CountDownLatch(1);
        BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
            handlerThreadName.set(Thread.currentThread().getName());
            try {
                latch.await();
            }
            catch (InterruptedException e1) {
                Thread.currentThread().interrupt();
                Assert.fail((String)"InterruptedException");
            }
            return "hello there".getBytes();
        };
        this.netty2.registerHandler(subject, handler, (Executor)handlerExecutor);
        CompletableFuture response = this.netty1.sendAndReceive(this.ep2, subject, "hello world".getBytes(), (Executor)completionExecutor);
        response.whenComplete((r, e) -> completionThreadName.set(Thread.currentThread().getName()));
        latch.countDown();
        Assert.assertTrue((boolean)Arrays.equals("hello there".getBytes(), (byte[])response.join()));
        Assert.assertEquals((Object)"completion-thread", completionThreadName.get());
        Assert.assertEquals((Object)"handler-thread", handlerThreadName.get());
    }

    private ClusterMetadataService dummyMetadataService(final String name, final String ipAddress, final Endpoint ep) {
        return new ClusterMetadataService(){

            public ClusterMetadata getClusterMetadata() {
                return new ClusterMetadata(new ProviderId(NettyMessagingManagerTest.DUMMY_NAME, NettyMessagingManagerTest.DUMMY_NAME), name, (Set)Sets.newHashSet(), (Set)Sets.newHashSet());
            }

            public ControllerNode getLocalNode() {
                return new ControllerNode(){

                    public NodeId id() {
                        return null;
                    }

                    public IpAddress ip() {
                        return IpAddress.valueOf((String)ipAddress);
                    }

                    public int tcpPort() {
                        return ep.port();
                    }
                };
            }

            public void addListener(ClusterMetadataEventListener listener) {
            }

            public void removeListener(ClusterMetadataEventListener listener) {
            }
        };
    }
}

