/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.ReferenceCounted;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.impl.CopycatTransport;

public class CopycatTransportConnection
implements Connection {
    private final Listeners<Throwable> exceptionListeners = new Listeners();
    private final Listeners<Connection> closeListeners = new Listeners();
    static final byte SUCCESS = 3;
    static final byte FAILURE = 4;
    private final long connectionId;
    private final CopycatTransport.Mode mode;
    private final Address remoteAddress;
    private final MessagingService messagingService;
    private final String outboundMessageSubject;
    private final String inboundMessageSubject;
    private final ThreadContext context;
    private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
    private final AtomicInteger messagesSent = new AtomicInteger(0);
    private final AtomicInteger sendFailures = new AtomicInteger(0);
    private final AtomicInteger messagesReceived = new AtomicInteger(0);
    private final AtomicInteger receiveFailures = new AtomicInteger(0);

    CopycatTransportConnection(long connectionId, CopycatTransport.Mode mode, PartitionId partitionId, Address address, MessagingService messagingService, ThreadContext context) {
        this.connectionId = connectionId;
        this.mode = (CopycatTransport.Mode)((Object)Preconditions.checkNotNull((Object)((Object)mode)));
        this.remoteAddress = (Address)Preconditions.checkNotNull((Object)address);
        this.messagingService = (MessagingService)Preconditions.checkNotNull((Object)messagingService);
        if (mode == CopycatTransport.Mode.CLIENT) {
            this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
            this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
        } else {
            this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
            this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
        }
        this.context = (ThreadContext)Preconditions.checkNotNull((Object)context);
    }

    public void setBidirectional() {
        this.messagingService.registerHandler(this.inboundMessageSubject, (sender, payload) -> {
            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream((byte[])payload));){
                if (input.readLong() != this.connectionId) {
                    throw new IllegalStateException("Invalid connection Id");
                }
                CompletableFuture<byte[]> completableFuture = this.handle(IOUtils.toByteArray((InputStream)input));
                return completableFuture;
            }
            catch (IOException e) {
                Throwables.propagate((Throwable)e);
                return null;
            }
        });
    }

    public <T, U> CompletableFuture<U> send(T message) {
        ThreadContext context = ThreadContext.currentContextOrThrow();
        CompletableFuture result = new CompletableFuture();
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            new DataOutputStream(baos).writeLong(this.connectionId);
            context.serializer().writeObject(message, (OutputStream)baos);
            if (message instanceof ReferenceCounted) {
                ((ReferenceCounted)message).release();
            }
            this.messagingService.sendAndReceive(CopycatTransport.toEndpoint(this.remoteAddress), this.outboundMessageSubject, baos.toByteArray(), context.executor()).whenComplete((r, e) -> {
                if (e == null) {
                    this.messagesSent.incrementAndGet();
                } else {
                    this.sendFailures.incrementAndGet();
                }
                this.handleResponse((byte[])r, (Throwable)e, result, context);
            });
        }
        catch (Exception e2) {
            result.completeExceptionally((Throwable)new TransportException("Failed to send request", (Throwable)e2));
        }
        return result;
    }

    private <T> void handleResponse(byte[] response, Throwable error, CompletableFuture<T> future, ThreadContext context) {
        if (error != null) {
            context.execute(() -> future.completeExceptionally(error));
            return;
        }
        Preconditions.checkNotNull((Object)response);
        ByteArrayInputStream input = new ByteArrayInputStream(response);
        try {
            byte status = (byte)((InputStream)input).read();
            if (status == 4) {
                Throwable t = (Throwable)context.serializer().readObject((InputStream)input);
                context.execute(() -> future.completeExceptionally(t));
            } else {
                context.execute(() -> future.complete(context.serializer().readObject(input)));
            }
        }
        catch (IOException e) {
            context.execute(() -> future.completeExceptionally(e));
        }
    }

    public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
        Assert.notNull(type, (String)"type");
        this.handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
        return null;
    }

    public CompletableFuture<byte[]> handle(byte[] message) {
        try {
            Object request = this.context.serializer().readObject((InputStream)new ByteArrayInputStream(message));
            InternalHandler handler = this.handlers.get(request.getClass());
            if (handler == null) {
                return Tools.exceptionalFuture((Throwable)new IllegalStateException("No handler registered for " + request.getClass()));
            }
            return handler.handle(request).handle((result, error) -> {
                if (error == null) {
                    this.messagesReceived.incrementAndGet();
                } else {
                    this.receiveFailures.incrementAndGet();
                }
                try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                    baos.write(error != null ? 4 : 3);
                    this.context.serializer().writeObject(error != null ? error : result, (OutputStream)baos);
                    byte[] byArray = baos.toByteArray();
                    return byArray;
                }
                catch (IOException e) {
                    Throwables.propagate((Throwable)e);
                    return null;
                }
            });
        }
        catch (Exception e) {
            return Tools.exceptionalFuture((Throwable)e);
        }
    }

    public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
        return this.exceptionListeners.add(listener);
    }

    public Listener<Connection> closeListener(Consumer<Connection> listener) {
        return this.closeListeners.add(listener);
    }

    public CompletableFuture<Void> close() {
        this.closeListeners.forEach(listener -> listener.accept((Object)this));
        if (this.mode == CopycatTransport.Mode.CLIENT) {
            this.messagingService.unregisterHandler(this.inboundMessageSubject);
        }
        return CompletableFuture.completedFuture(null);
    }

    public int hashCode() {
        return Objects.hash(this.connectionId);
    }

    public boolean equals(Object other) {
        if (!(other instanceof CopycatTransportConnection)) {
            return false;
        }
        return this.connectionId == ((CopycatTransportConnection)other).connectionId;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this.getClass()).add("id", this.connectionId).add("sent", this.messagesSent.get()).add("received", this.messagesReceived.get()).add("sendFailures", this.sendFailures.get()).add("receiveFailures", this.receiveFailures.get()).toString();
    }

    private final class InternalHandler {
        private final MessageHandler handler;
        private final ThreadContext context;

        private InternalHandler(MessageHandler handler, ThreadContext context) {
            this.handler = handler;
            this.context = context;
        }

        public CompletableFuture<Object> handle(Object message) {
            CompletableFuture<Object> answer = new CompletableFuture<Object>();
            this.context.execute(() -> this.handler.handle(message).whenComplete((r, e) -> {
                if (e != null) {
                    answer.completeExceptionally((Throwable)e);
                } else {
                    answer.complete(r);
                }
            }));
            return answer;
        }
    }
}

