/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.collect.Lists;
import io.atomix.catalyst.concurrent.SingleThreadContext;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.protocol.ConnectRequest;
import io.atomix.copycat.protocol.ConnectResponse;
import io.atomix.copycat.protocol.Response;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.impl.CatalystSerializers;
import org.onosproject.store.primitives.impl.CopycatTransport;

public class CopycatTransportTest {
    private static final String IP_STRING = "127.0.0.1";
    private Endpoint endpoint1 = new Endpoint(IpAddress.valueOf((String)"127.0.0.1"), 5001);
    private Endpoint endpoint2 = new Endpoint(IpAddress.valueOf((String)"127.0.0.1"), 5002);
    private MessagingService service1;
    private MessagingService service2;
    private Transport clientTransport;
    private ThreadContext clientContext;
    private Transport serverTransport;
    private ThreadContext serverContext;

    @Before
    public void setUp() throws Exception {
        ConcurrentHashMap<Endpoint, TestMessagingService> services = new ConcurrentHashMap<Endpoint, TestMessagingService>();
        this.endpoint1 = new Endpoint(IpAddress.valueOf((String)IP_STRING), TestTools.findAvailablePort((int)5001));
        this.service1 = new TestMessagingService(this.endpoint1, services);
        this.clientTransport = new CopycatTransport(PartitionId.from((int)1), this.service1);
        this.clientContext = new SingleThreadContext("client-test-%d", CatalystSerializers.getSerializer());
        this.endpoint2 = new Endpoint(IpAddress.valueOf((String)IP_STRING), TestTools.findAvailablePort((int)5003));
        this.service2 = new TestMessagingService(this.endpoint2, services);
        this.serverTransport = new CopycatTransport(PartitionId.from((int)1), this.service2);
        this.serverContext = new SingleThreadContext("server-test-%d", CatalystSerializers.getSerializer());
    }

    @After
    public void tearDown() throws Exception {
        if (this.clientContext != null) {
            this.clientContext.close();
        }
        if (this.serverContext != null) {
            this.serverContext.close();
        }
    }

    @Test
    public void testCopycatClientConnectionSend() throws Exception {
        Client client = this.clientTransport.client();
        Server server = this.serverTransport.server();
        CountDownLatch latch = new CountDownLatch(4);
        CountDownLatch listenLatch = new CountDownLatch(1);
        CountDownLatch handlerLatch = new CountDownLatch(1);
        this.serverContext.executor().execute(() -> server.listen(new Address(IP_STRING, this.endpoint2.port()), connection -> {
            this.serverContext.checkThread();
            latch.countDown();
            connection.handler(ConnectRequest.class, request -> {
                this.serverContext.checkThread();
                latch.countDown();
                return CompletableFuture.completedFuture(((ConnectResponse.Builder)ConnectResponse.builder().withStatus(Response.Status.OK)).withLeader(new Address(IP_STRING, this.endpoint2.port())).withMembers((Collection)Lists.newArrayList((Object[])new Address[]{new Address(IP_STRING, this.endpoint2.port())})).build());
            });
            handlerLatch.countDown();
        }).thenRun(listenLatch::countDown));
        listenLatch.await(5L, TimeUnit.SECONDS);
        this.clientContext.executor().execute(() -> client.connect(new Address(IP_STRING, this.endpoint2.port())).thenAccept(connection -> {
            this.clientContext.checkThread();
            latch.countDown();
            try {
                handlerLatch.await(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Assert.fail();
            }
            connection.sendAndReceive((Object)ConnectRequest.builder().withClientId(UUID.randomUUID().toString()).build()).thenAccept(response -> {
                this.clientContext.checkThread();
                Assert.assertNotNull((Object)response);
                Assert.assertEquals((Object)Response.Status.OK, (Object)response.status());
                latch.countDown();
            });
        }));
        latch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
    }

    @Test
    public void testCopycatServerConnectionSend() throws Exception {
        Client client = this.clientTransport.client();
        Server server = this.serverTransport.server();
        CountDownLatch latch = new CountDownLatch(4);
        CountDownLatch listenLatch = new CountDownLatch(1);
        this.serverContext.executor().execute(() -> server.listen(new Address(IP_STRING, this.endpoint2.port()), connection -> {
            this.serverContext.checkThread();
            latch.countDown();
            this.serverContext.schedule(Duration.ofMillis(100L), () -> connection.sendAndReceive((Object)ConnectRequest.builder().withClientId("foo").build()).thenAccept(response -> {
                this.serverContext.checkThread();
                Assert.assertEquals((Object)Response.Status.OK, (Object)response.status());
                latch.countDown();
            }));
        }).thenRun(listenLatch::countDown));
        listenLatch.await(5L, TimeUnit.SECONDS);
        this.clientContext.executor().execute(() -> client.connect(new Address(IP_STRING, this.endpoint2.port())).thenAccept(connection -> {
            this.clientContext.checkThread();
            latch.countDown();
            connection.handler(ConnectRequest.class, request -> {
                this.clientContext.checkThread();
                latch.countDown();
                Assert.assertEquals((Object)"foo", (Object)request.client());
                return CompletableFuture.completedFuture(((ConnectResponse.Builder)ConnectResponse.builder().withStatus(Response.Status.OK)).withLeader(new Address(IP_STRING, this.endpoint2.port())).withMembers((Collection)Lists.newArrayList((Object[])new Address[]{new Address(IP_STRING, this.endpoint2.port())})).build());
            });
        }));
        latch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
    }

    @Test
    public void testCopycatClientConnectionClose() throws Exception {
        Client client = this.clientTransport.client();
        Server server = this.serverTransport.server();
        CountDownLatch latch = new CountDownLatch(5);
        CountDownLatch listenLatch = new CountDownLatch(1);
        this.serverContext.executor().execute(() -> server.listen(new Address(IP_STRING, this.endpoint2.port()), connection -> {
            this.serverContext.checkThread();
            latch.countDown();
            connection.onClose(c -> {
                this.serverContext.checkThread();
                latch.countDown();
            });
        }).thenRun(listenLatch::countDown));
        listenLatch.await(5L, TimeUnit.SECONDS);
        this.clientContext.executor().execute(() -> client.connect(new Address(IP_STRING, this.endpoint2.port())).thenAccept(connection -> {
            this.clientContext.checkThread();
            latch.countDown();
            connection.onClose(c -> {
                this.clientContext.checkThread();
                latch.countDown();
            });
            this.clientContext.schedule(Duration.ofMillis(100L), () -> connection.close().whenComplete((result, error) -> {
                this.clientContext.checkThread();
                latch.countDown();
            }));
        }));
        latch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
    }

    @Test
    public void testCopycatServerConnectionClose() throws Exception {
        Client client = this.clientTransport.client();
        Server server = this.serverTransport.server();
        CountDownLatch latch = new CountDownLatch(5);
        CountDownLatch listenLatch = new CountDownLatch(1);
        this.serverContext.executor().execute(() -> server.listen(new Address(IP_STRING, this.endpoint2.port()), connection -> {
            this.serverContext.checkThread();
            latch.countDown();
            connection.onClose(c -> latch.countDown());
            this.serverContext.schedule(Duration.ofMillis(100L), () -> connection.close().whenComplete((result, error) -> {
                this.serverContext.checkThread();
                latch.countDown();
            }));
        }).thenRun(listenLatch::countDown));
        listenLatch.await(5L, TimeUnit.SECONDS);
        this.clientContext.executor().execute(() -> client.connect(new Address(IP_STRING, this.endpoint2.port())).thenAccept(connection -> {
            this.clientContext.checkThread();
            latch.countDown();
            connection.onClose(c -> latch.countDown());
        }));
        latch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
    }

    public static final class TestMessagingService
    implements MessagingService {
        private final Endpoint endpoint;
        private final Map<Endpoint, TestMessagingService> services;
        private final Map<String, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>>> handlers = new ConcurrentHashMap<String, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>>>();

        TestMessagingService(Endpoint endpoint, Map<Endpoint, TestMessagingService> services) {
            this.endpoint = endpoint;
            this.services = services;
            services.put(endpoint, this);
        }

        private CompletableFuture<byte[]> handle(Endpoint ep, String type, byte[] message, Executor executor) {
            BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = this.handlers.get(type);
            if (handler == null) {
                return Tools.exceptionalFuture((Throwable)new IllegalStateException());
            }
            return handler.apply(ep, message).thenApplyAsync(r -> r, executor);
        }

        public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
            return null;
        }

        public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
            return null;
        }

        public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
            TestMessagingService service = this.services.get(ep);
            if (service == null) {
                return Tools.exceptionalFuture((Throwable)new IllegalStateException());
            }
            return service.handle(this.endpoint, type, payload, executor);
        }

        public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
        }

        public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
        }

        public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
            this.handlers.put(type, handler);
        }

        public void unregisterHandler(String type) {
            this.handlers.remove(type);
        }
    }
}

