/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyClient;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.netty.NettyProtos;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;

public class NettyRpcProxy
implements Closeable {
    private final RaftPeer peer;
    private final Connection connection;
    private final TimeDuration requestTimeoutDuration;

    public static long getCallId(NettyProtos.RaftNettyServerReplyProto proto) {
        switch (proto.getRaftNettyServerReplyCase()) {
            case REQUESTVOTEREPLY: {
                return proto.getRequestVoteReply().getServerReply().getCallId();
            }
            case STARTLEADERELECTIONREPLY: {
                return proto.getStartLeaderElectionReply().getServerReply().getCallId();
            }
            case APPENDENTRIESREPLY: {
                return proto.getAppendEntriesReply().getServerReply().getCallId();
            }
            case INSTALLSNAPSHOTREPLY: {
                return proto.getInstallSnapshotReply().getServerReply().getCallId();
            }
            case RAFTCLIENTREPLY: {
                return proto.getRaftClientReply().getRpcReply().getCallId();
            }
            case EXCEPTIONREPLY: {
                return proto.getExceptionReply().getRpcReply().getCallId();
            }
            case RAFTNETTYSERVERREPLY_NOT_SET: {
                throw new IllegalArgumentException("Reply case not set in proto: " + proto.getRaftNettyServerReplyCase());
            }
        }
        throw new UnsupportedOperationException("Reply case not supported: " + proto.getRaftNettyServerReplyCase());
    }

    public NettyRpcProxy(RaftPeer peer, RaftProperties properties, EventLoopGroup group) throws InterruptedException {
        this.peer = peer;
        this.connection = new Connection(group);
        this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
    }

    @Override
    public void close() {
        this.connection.close();
    }

    public NettyProtos.RaftNettyServerReplyProto send(RaftProtos.RaftRpcRequestProto request, NettyProtos.RaftNettyServerRequestProto proto) throws IOException {
        CompletableFuture<NettyProtos.RaftNettyServerReplyProto> reply = new CompletableFuture<NettyProtos.RaftNettyServerReplyProto>();
        ChannelFuture channelFuture = this.connection.offer(proto, reply);
        try {
            channelFuture.sync();
            TimeDuration newDuration = this.requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
            return reply.get(newDuration.getDuration(), newDuration.getUnit());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IOUtils.toInterruptedIOException(ProtoUtils.toString(request) + " sending from " + this.peer + " is interrupted.", e);
        }
        catch (ExecutionException e) {
            throw IOUtils.toIOException(e);
        }
        catch (TimeoutException e) {
            throw new TimeoutIOException(e.getMessage(), e);
        }
    }

    class Connection
    implements Closeable {
        private final NettyClient client = new NettyClient();
        private final Queue<CompletableFuture<NettyProtos.RaftNettyServerReplyProto>> replies = new LinkedList<CompletableFuture<NettyProtos.RaftNettyServerReplyProto>>();

        Connection(EventLoopGroup group) throws InterruptedException {
            final SimpleChannelInboundHandler<NettyProtos.RaftNettyServerReplyProto> inboundHandler = new SimpleChannelInboundHandler<NettyProtos.RaftNettyServerReplyProto>(){

                @Override
                protected void channelRead0(ChannelHandlerContext ctx, NettyProtos.RaftNettyServerReplyProto proto) {
                    CompletableFuture<NettyProtos.RaftNettyServerReplyProto> future = Connection.this.pollReply();
                    if (future == null) {
                        throw new IllegalStateException("Request #" + NettyRpcProxy.getCallId(proto) + " not found");
                    }
                    if (proto.getRaftNettyServerReplyCase() == NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY) {
                        Object ioe = ProtoUtils.toObject(proto.getExceptionReply().getException());
                        future.completeExceptionally((IOException)ioe);
                    } else {
                        future.complete(proto);
                    }
                }
            };
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new ProtobufVarint32FrameDecoder());
                    p.addLast(new ProtobufDecoder(NettyProtos.RaftNettyServerReplyProto.getDefaultInstance()));
                    p.addLast(new ProtobufVarint32LengthFieldPrepender());
                    p.addLast(new ProtobufEncoder());
                    p.addLast(inboundHandler);
                }
            };
            this.client.connect(NettyRpcProxy.this.peer.getAddress(), group, initializer);
        }

        synchronized ChannelFuture offer(NettyProtos.RaftNettyServerRequestProto request, CompletableFuture<NettyProtos.RaftNettyServerReplyProto> reply) {
            this.replies.offer(reply);
            return this.client.writeAndFlush(request);
        }

        synchronized CompletableFuture<NettyProtos.RaftNettyServerReplyProto> pollReply() {
            return this.replies.poll();
        }

        @Override
        public synchronized void close() {
            this.client.close();
            if (!this.replies.isEmpty()) {
                IOException e = new IOException("Connection to " + NettyRpcProxy.this.peer + " is closed.");
                this.replies.stream().forEach(f -> f.completeExceptionally(e));
                this.replies.clear();
            }
        }
    }

    public static class PeerMap
    extends PeerProxyMap<NettyRpcProxy> {
        private final EventLoopGroup group = new NioEventLoopGroup();
        private final RaftProperties properties;

        public PeerMap(String name, RaftProperties properties) {
            super(name);
            this.properties = properties;
        }

        @Override
        public NettyRpcProxy createProxyImpl(RaftPeer peer) throws IOException {
            try {
                return new NettyRpcProxy(peer, this.properties, this.group);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw IOUtils.toInterruptedIOException("Failed connecting to " + peer, e);
            }
        }

        @Override
        public void close() {
            super.close();
            this.group.shutdownGracefully();
        }
    }
}

