/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.Listeners;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.reference.ReferenceCounted;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopycatTransportConnection
implements Connection {
    private static final int MAX_MESSAGE_SIZE = 0x100000;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final long connectionId;
    private final String localSubject;
    private final String remoteSubject;
    private final PartitionId partitionId;
    private final Endpoint endpoint;
    private final MessagingService messagingService;
    private final ThreadContext context;
    private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<Class, InternalHandler>();
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();

    CopycatTransportConnection(long connectionId, Mode mode, PartitionId partitionId, Endpoint endpoint, MessagingService messagingService, ThreadContext context) {
        this.connectionId = connectionId;
        this.partitionId = (PartitionId)Preconditions.checkNotNull((Object)partitionId, (Object)"partitionId cannot be null");
        this.localSubject = mode.getLocalSubject(partitionId, connectionId);
        this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
        this.endpoint = (Endpoint)Preconditions.checkNotNull((Object)endpoint, (Object)"endpoint cannot be null");
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService, (Object)"messagingService cannot be null");
        this.context = (ThreadContext)Preconditions.checkNotNull((Object)context, (Object)"context cannot be null");
        messagingService.registerHandler(this.localSubject, this::handle);
    }

    public CompletableFuture<Void> send(Object message) {
        ThreadContext context = ThreadContext.currentContextOrThrow();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            byte[] bytes;
            DataOutputStream dos = new DataOutputStream(baos);
            dos.writeByte(1);
            context.serializer().writeObject(message, (OutputStream)baos);
            if (message instanceof ReferenceCounted) {
                ((ReferenceCounted)message).release();
            }
            if ((bytes = baos.toByteArray()).length > 0x100000) {
                throw new IllegalArgumentException(message + " exceeds maximum message size " + 0x100000);
            }
            this.messagingService.sendAsync(this.endpoint, this.remoteSubject, bytes).whenComplete((r, e) -> {
                if (e != null) {
                    context.executor().execute(() -> future.completeExceptionally((Throwable)e));
                } else {
                    context.executor().execute(() -> future.complete(null));
                }
            });
        }
        catch (SerializationException | IOException e2) {
            future.completeExceptionally(e2);
        }
        return future;
    }

    public <T, U> CompletableFuture<U> sendAndReceive(T message) {
        ThreadContext context = ThreadContext.currentContextOrThrow();
        CompletableFuture future = new CompletableFuture();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            byte[] bytes;
            DataOutputStream dos = new DataOutputStream(baos);
            dos.writeByte(1);
            context.serializer().writeObject(message, (OutputStream)baos);
            if (message instanceof ReferenceCounted) {
                ((ReferenceCounted)message).release();
            }
            if ((bytes = baos.toByteArray()).length > 0x100000) {
                throw new IllegalArgumentException(message + " exceeds maximum message size " + 0x100000);
            }
            this.messagingService.sendAndReceive(this.endpoint, this.remoteSubject, bytes, context.executor()).whenComplete((response, error) -> this.handleResponse((byte[])response, (Throwable)error, future));
        }
        catch (SerializationException | IOException e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    private <T> void handleResponse(byte[] response, Throwable error, CompletableFuture<T> future) {
        if (error != null) {
            Throwable rootCause = Throwables.getRootCause((Throwable)error);
            if (rootCause instanceof MessagingException.NoRemoteHandler) {
                future.completeExceptionally((Throwable)new TransportException(error));
                this.close(rootCause);
            } else if (rootCause instanceof SocketException) {
                future.completeExceptionally((Throwable)new TransportException(error));
            } else {
                future.completeExceptionally(error);
            }
            return;
        }
        Preconditions.checkNotNull((Object)response);
        ByteArrayInputStream input = new ByteArrayInputStream(response);
        try {
            byte status = (byte)((InputStream)input).read();
            if (status == 2) {
                Throwable t = (Throwable)this.context.serializer().readObject((InputStream)input);
                future.completeExceptionally(t);
            } else {
                try {
                    future.complete(this.context.serializer().readObject((InputStream)input));
                }
                catch (SerializationException e) {
                    future.completeExceptionally(e);
                }
            }
        }
        catch (IOException e) {
            future.completeExceptionally(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
        try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload));){
            byte type = input.readByte();
            switch (type) {
                case 1: {
                    CompletableFuture<byte[]> completableFuture = this.handleMessage(IOUtils.toByteArray((InputStream)input));
                    return completableFuture;
                }
                case 3: {
                    CompletableFuture<byte[]> completableFuture = this.handleClose();
                    return completableFuture;
                }
            }
            throw new IllegalStateException("Invalid message type");
        }
        catch (IOException e) {
            Throwables.propagate((Throwable)e);
            return null;
        }
    }

    private CompletableFuture<byte[]> handleMessage(byte[] message) {
        try {
            Object request = this.context.serializer().readObject((InputStream)new ByteArrayInputStream(message));
            InternalHandler handler = this.handlers.get(request.getClass());
            if (handler == null) {
                this.log.warn("No handler registered on connection {}-{} for type {}", new Object[]{this.partitionId, this.connectionId, request.getClass()});
                return Tools.exceptionalFuture((Throwable)new IllegalStateException("No handler registered for " + request.getClass()));
            }
            return handler.handle(request).handle((result, error) -> {
                try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                    baos.write(error != null ? 2 : 1);
                    this.context.serializer().writeObject(error != null ? error : result, (OutputStream)baos);
                    byte[] bytes = baos.toByteArray();
                    if (bytes.length > 0x100000) {
                        throw new IllegalArgumentException("response exceeds maximum message size 1048576");
                    }
                    byte[] byArray = bytes;
                    return byArray;
                }
                catch (IOException e) {
                    Throwables.propagate((Throwable)e);
                    return null;
                }
            });
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    private CompletableFuture<byte[]> handleClose() {
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        this.context.executor().execute(() -> {
            this.close(null);
            ByteBuffer responseBuffer = ByteBuffer.allocate(1);
            responseBuffer.put((byte)1);
            future.complete(responseBuffer.array());
        });
        return future;
    }

    public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
        return this.handler(type, (T r) -> {
            handler.accept(r);
            return null;
        });
    }

    public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("Registered handler on connection {}-{}: {}", new Object[]{this.partitionId, this.connectionId, type});
        }
        this.handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
        return this;
    }

    public Listener<Throwable> onException(Consumer<Throwable> consumer) {
        return this.exceptionListeners.add(consumer);
    }

    public Listener<Connection> onClose(Consumer<Connection> consumer) {
        return this.closeListeners.add(consumer);
    }

    public CompletableFuture<Void> close() {
        this.log.debug("Closing connection {}-{}", (Object)this.partitionId, (Object)this.connectionId);
        ByteBuffer requestBuffer = ByteBuffer.allocate(1);
        requestBuffer.put((byte)3);
        ThreadContext context = ThreadContext.currentContextOrThrow();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.messagingService.sendAndReceive(this.endpoint, this.remoteSubject, requestBuffer.array(), context.executor()).whenComplete((payload, error) -> {
            this.close((Throwable)error);
            Throwable wrappedError = error;
            if (error != null) {
                Throwable rootCause = Throwables.getRootCause((Throwable)error);
                if (rootCause instanceof MessagingException.NoRemoteHandler) {
                    wrappedError = new TransportException(error);
                }
                future.completeExceptionally(wrappedError);
            } else {
                ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
                if (responseBuffer.get() == 1) {
                    future.complete(null);
                } else {
                    future.completeExceptionally((Throwable)new TransportException("Failed to close connection"));
                }
            }
        });
        return future;
    }

    private void close(Throwable error) {
        this.log.debug("Connection {}-{} closed", (Object)this.partitionId, (Object)this.connectionId);
        this.messagingService.unregisterHandler(this.localSubject);
        if (error != null) {
            this.exceptionListeners.accept((Object)error);
        }
        this.closeListeners.accept((Object)this);
    }

    private static class InternalHandler {
        private final Function handler;
        private final ThreadContext context;

        InternalHandler(Function handler, ThreadContext context) {
            this.handler = handler;
            this.context = context;
        }

        CompletableFuture<Object> handle(Object message) {
            CompletableFuture<Object> future = new CompletableFuture<Object>();
            this.context.executor().execute(() -> {
                CompletableFuture responseFuture = (CompletableFuture)this.handler.apply(message);
                if (responseFuture != null) {
                    responseFuture.whenComplete((r, e) -> {
                        if (e != null) {
                            future.completeExceptionally((Throwable)e);
                        } else {
                            future.complete(r);
                        }
                    });
                }
            });
            return future;
        }
    }

    static enum Mode {
        CLIENT{

            @Override
            String getLocalSubject(PartitionId partitionId, long connectionId) {
                return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
            }

            @Override
            String getRemoteSubject(PartitionId partitionId, long connectionId) {
                return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
            }
        }
        ,
        SERVER{

            @Override
            String getLocalSubject(PartitionId partitionId, long connectionId) {
                return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
            }

            @Override
            String getRemoteSubject(PartitionId partitionId, long connectionId) {
                return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
            }
        };


        abstract String getLocalSubject(PartitionId var1, long var2);

        abstract String getRemoteSubject(PartitionId var1, long var2);
    }
}

