/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.thirdparty.io.netty.util.concurrent.Future;
import org.apache.ratis.thirdparty.io.netty.util.concurrent.GenericFutureListener;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientStreamRpc
implements DataStreamClientRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
    private final String name;
    private final Connection connection;
    private final int flushRequestCountMin;
    private final SizeInBytes flushRequestBytesMin;
    private final OutstandingRequests outstandingRequests = new OutstandingRequests();
    private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<ClientInvocationId, ReplyQueue>();
    private final TimeDuration replyQueueGracePeriod;
    private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();

    public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
        this.name = JavaUtils.getClassSimpleName(this.getClass()) + "->" + server;
        this.replyQueueGracePeriod = NettyConfigKeys.DataStream.Client.replyQueueGracePeriod(properties);
        this.flushRequestCountMin = RaftClientConfigKeys.DataStream.flushRequestCountMin(properties);
        this.flushRequestBytesMin = RaftClientConfigKeys.DataStream.flushRequestBytesMin(properties);
        InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress());
        SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
        this.connection = new Connection(address, new WorkerGroupGetter(properties), () -> NettyClientStreamRpc.newChannelInitializer(address, sslContext, this.getClientHandler()));
    }

    private ChannelInboundHandler getClientHandler() {
        return new ChannelInboundHandlerAdapter(){
            private ClientInvocationId clientInvocationId;

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                CompletableFuture<DataStreamReply> f;
                ReplyQueue queue;
                if (!(msg instanceof DataStreamReply)) {
                    LOG.error("{}: unexpected message {}", (Object)this, (Object)msg.getClass());
                    return;
                }
                DataStreamReply reply = (DataStreamReply)msg;
                LOG.debug("{}: read {}", (Object)this, (Object)reply);
                this.clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
                ReplyQueue replyQueue = queue = reply.isSuccess() ? (ReplyQueue)NettyClientStreamRpc.this.replies.get(this.clientInvocationId) : (ReplyQueue)NettyClientStreamRpc.this.replies.remove(this.clientInvocationId);
                if (queue != null && (f = queue.poll()) != null) {
                    Integer emptyId;
                    f.complete(reply);
                    if (!reply.isSuccess() && queue.size() > 0) {
                        IllegalStateException e = new IllegalStateException(this + ": an earlier request failed with " + reply);
                        queue.forEach(future -> future.completeExceptionally(e));
                    }
                    if ((emptyId = queue.getEmptyId()) != null) {
                        NettyClientStreamRpc.this.timeoutScheduler.onTimeout(NettyClientStreamRpc.this.replyQueueGracePeriod, () -> NettyClientStreamRpc.this.replies.computeIfPresent(this.clientInvocationId, (key, q) -> q == queue && emptyId.equals(q.getEmptyId()) ? null : q), LOG, () -> "Timeout check failed, clientInvocationId=" + this.clientInvocationId);
                    }
                }
            }

            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                LOG.warn(NettyClientStreamRpc.this.name + ": exceptionCaught", cause);
                Optional.ofNullable(this.clientInvocationId).map(NettyClientStreamRpc.this.replies::remove).orElse(ReplyQueue.EMPTY).forEach(f -> f.completeExceptionally(cause));
                ctx.close();
            }

            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                if (!NettyClientStreamRpc.this.connection.isClosed()) {
                    NettyClientStreamRpc.this.connection.scheduleReconnect("channel is inactive", null);
                }
            }
        };
    }

    static ChannelInitializer<SocketChannel> newChannelInitializer(final InetSocketAddress address, final SslContext sslContext, final ChannelInboundHandler handler) {
        return new ChannelInitializer<SocketChannel>(){

            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                if (sslContext != null) {
                    p.addLast("ssl", (ChannelHandler)sslContext.newHandler(ch.alloc(), address.getHostName(), address.getPort()));
                }
                p.addLast(NettyClientStreamRpc.newEncoder());
                p.addLast(NettyClientStreamRpc.newEncoderDataStreamRequestFilePositionCount());
                p.addLast(NettyClientStreamRpc.newEncoderByteBuffer());
                p.addLast(NettyClientStreamRpc.newDecoder());
                p.addLast(handler);
            }
        };
    }

    static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamRequestByteBuffer>(){

            @Override
            protected void encode(ChannelHandlerContext context, DataStreamRequestByteBuffer request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, out::add, context.alloc());
            }
        };
    }

    static MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
        return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>(){

            @Override
            protected void encode(ChannelHandlerContext ctx, DataStreamRequestFilePositionCount request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, out::add, ctx.alloc());
            }
        };
    }

    static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() {
        return new MessageToMessageEncoder<ByteBuffer>(){

            @Override
            protected void encode(ChannelHandlerContext ctx, ByteBuffer request, List<Object> out) {
                NettyDataStreamUtils.encodeByteBuffer(request, out::add);
            }
        };
    }

    static ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder(){
            {
                this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            @Override
            protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
                Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(buf)).ifPresent(out::add);
            }
        };
    }

    @Override
    public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
        CompletableFuture<DataStreamReply> f = new CompletableFuture<DataStreamReply>();
        ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
        ReplyQueue q = this.replies.computeIfAbsent(clientInvocationId, key -> new ReplyQueue());
        if (!q.offer(f)) {
            f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
            return f;
        }
        Channel channel = this.connection.getChannelUninterruptibly();
        if (channel == null) {
            f.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + request));
            return f;
        }
        LOG.debug("{}: write {}", (Object)this, (Object)request);
        Function<DataStreamRequest, ChannelFuture> writeMethod = this.outstandingRequests.write(request) ? channel::writeAndFlush : channel::write;
        writeMethod.apply(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
            if (!future.isSuccess()) {
                IOException e = new IOException(this + ": Failed to send " + request, future.cause());
                LOG.error("Channel write failed", e);
                f.completeExceptionally(e);
            }
        }));
        return f;
    }

    @Override
    public void close() {
        boolean flush = this.outstandingRequests.shouldFlush(true, 0, SizeInBytes.ZERO);
        LOG.debug("flush? {}", (Object)flush);
        if (flush) {
            Optional.ofNullable(this.connection.getChannelUninterruptibly()).map(c -> c.writeAndFlush(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER)).ifPresent(f -> f.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)dummy -> this.connection.close())));
        } else {
            this.connection.close();
        }
    }

    public String toString() {
        return this.name;
    }

    class OutstandingRequests {
        private int count;
        private long bytes;

        OutstandingRequests() {
        }

        synchronized boolean write(DataStreamRequest request) {
            ++this.count;
            this.bytes += request.getDataLength();
            List<WriteOption> options = request.getWriteOptionList();
            boolean isClose = options.contains(StandardWriteOption.CLOSE);
            boolean isFlush = options.contains(StandardWriteOption.FLUSH);
            boolean flush = this.shouldFlush(isClose || isFlush, NettyClientStreamRpc.this.flushRequestCountMin, NettyClientStreamRpc.this.flushRequestBytesMin);
            LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush? {}", request.getStreamId(), this.count, this.bytes, options, flush);
            return flush;
        }

        synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes bytesMin) {
            if (force || this.count >= countMin || this.bytes >= bytesMin.getSize()) {
                this.count = 0;
                this.bytes = 0L;
                return true;
            }
            return false;
        }
    }

    static class Connection {
        static final TimeDuration RECONNECT = TimeDuration.valueOf(100L, TimeUnit.MILLISECONDS);
        private final InetSocketAddress address;
        private final WorkerGroupGetter workerGroup;
        private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;
        private final AtomicReference<ChannelFuture> ref;

        Connection(InetSocketAddress address, WorkerGroupGetter workerGroup, Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier) {
            this.address = address;
            this.workerGroup = workerGroup;
            this.channelInitializerSupplier = channelInitializerSupplier;
            this.ref = new AtomicReference<ChannelFuture>(this.connect());
        }

        Channel getChannelUninterruptibly() {
            ChannelFuture future = this.ref.get();
            if (future == null) {
                return null;
            }
            Channel channel = future.syncUninterruptibly().channel();
            if (channel.isActive()) {
                return channel;
            }
            ChannelFuture f = this.reconnect();
            return f == null ? null : f.syncUninterruptibly().channel();
        }

        private EventLoopGroup getWorkerGroup() {
            return this.workerGroup.get();
        }

        private ChannelFuture connect() {
            return ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.getWorkerGroup())).channel(NioSocketChannel.class)).handler(this.channelInitializerSupplier.get())).option(ChannelOption.SO_KEEPALIVE, true)).connect(this.address).addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        this.scheduleReconnect(this + " failed", future.cause());
                    } else {
                        LOG.trace("{} succeed.", (Object)this);
                    }
                }
            });
        }

        void scheduleReconnect(String message, Throwable cause) {
            LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, this.address, RECONNECT);
            if (cause != null) {
                LOG.warn("", cause);
            }
            this.getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit());
        }

        private synchronized ChannelFuture reconnect() {
            Channel channel;
            ChannelFuture channelFuture = this.ref.get();
            if (channelFuture != null && (channel = channelFuture.syncUninterruptibly().channel()).isActive()) {
                return channelFuture;
            }
            MemoizedSupplier<ChannelFuture> supplier = MemoizedSupplier.valueOf(this::connect);
            ChannelFuture previous = this.ref.getAndUpdate(prev -> prev == null ? null : (ChannelFuture)supplier.get());
            if (previous != null) {
                previous.channel().close();
            }
            return supplier.isInitialized() ? supplier.get() : null;
        }

        void close() {
            ChannelFuture previous = this.ref.getAndSet(null);
            if (previous != null) {
                previous.channel().close().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> this.workerGroup.shutdownGracefully()));
            }
        }

        boolean isClosed() {
            return this.ref.get() == null;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(this.getClass()) + "-" + this.address;
        }
    }

    static class ReplyQueue
    implements Iterable<CompletableFuture<DataStreamReply>> {
        static final ReplyQueue EMPTY = new ReplyQueue();
        private final Queue<CompletableFuture<DataStreamReply>> queue = new ConcurrentLinkedQueue<CompletableFuture<DataStreamReply>>();
        private int emptyId;

        ReplyQueue() {
        }

        synchronized Integer getEmptyId() {
            return this.queue.isEmpty() ? Integer.valueOf(this.emptyId) : null;
        }

        synchronized boolean offer(CompletableFuture<DataStreamReply> f) {
            if (this.queue.offer(f)) {
                ++this.emptyId;
                return true;
            }
            return false;
        }

        CompletableFuture<DataStreamReply> poll() {
            return this.queue.poll();
        }

        int size() {
            return this.queue.size();
        }

        @Override
        public Iterator<CompletableFuture<DataStreamReply>> iterator() {
            return this.queue.iterator();
        }
    }

    private static class WorkerGroupGetter
    implements Supplier<EventLoopGroup> {
        private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference();
        private final EventLoopGroup workerGroup;
        private final boolean ignoreShutdown;

        static EventLoopGroup newWorkerGroup(RaftProperties properties) {
            return NettyUtils.newEventLoopGroup(JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) + "-workerGroup", NettyConfigKeys.DataStream.Client.workerGroupSize(properties), false);
        }

        WorkerGroupGetter(RaftProperties properties) {
            if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) {
                this.workerGroup = SHARED_WORKER_GROUP.updateAndGet(g2 -> g2 != null ? g2 : WorkerGroupGetter.newWorkerGroup(properties));
                this.ignoreShutdown = true;
            } else {
                this.workerGroup = WorkerGroupGetter.newWorkerGroup(properties);
                this.ignoreShutdown = false;
            }
        }

        @Override
        public EventLoopGroup get() {
            return this.workerGroup;
        }

        void shutdownGracefully() {
            if (!this.ignoreShutdown) {
                this.workerGroup.shutdownGracefully();
            }
        }
    }
}

