/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.netty.server.DataStreamManagement;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyServerStreamRpc
implements DataStreamServerRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
    private final String name;
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final ChannelFuture channelFuture;
    private final DataStreamManagement requests;
    private final ProxiesPool proxies;
    private final NettyServerStreamRpcMetrics metrics;

    public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
        this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(this.getClass());
        this.metrics = new NettyServerStreamRpcMetrics(this.name);
        this.requests = new DataStreamManagement(server, this.metrics);
        RaftProperties properties = server.getProperties();
        this.proxies = new ProxiesPool(this.name, properties, parameters);
        boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties);
        this.bossGroup = NettyUtils.newEventLoopGroup(this.name + "-bossGroup", NettyConfigKeys.DataStream.Server.bossGroupSize(properties), useEpoll);
        this.workerGroup = NettyUtils.newEventLoopGroup(this.name + "-workerGroup", NettyConfigKeys.DataStream.Server.workerGroupSize(properties), useEpoll);
        TlsConf tlsConf = NettyConfigKeys.DataStream.Server.tlsConf(parameters);
        SslContext sslContext = NettyUtils.buildSslContextForServer(tlsConf);
        String host = NettyConfigKeys.DataStream.host(server.getProperties());
        int port = NettyConfigKeys.DataStream.port(properties);
        InetSocketAddress socketAddress = host == null || host.isEmpty() ? new InetSocketAddress(port) : new InetSocketAddress(host, port);
        this.channelFuture = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.bossGroup, this.workerGroup).channel(this.bossGroup instanceof EpollEventLoopGroup ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(this.newChannelInitializer(sslContext)).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).bind(socketAddress);
    }

    static DataStreamClient newClient(RaftPeer peer, RaftProperties properties, Parameters parameters) {
        return DataStreamClient.newBuilder().setClientId(ClientId.randomId()).setDataStreamServer(peer).setProperties(properties).setParameters(parameters).build();
    }

    @Override
    public void addRaftPeers(Collection<RaftPeer> newPeers) {
        this.proxies.addRaftPeers(newPeers);
    }

    private ChannelInboundHandler newChannelInboundHandlerAdapter() {
        return new ChannelInboundHandlerAdapter(){
            private final RequestRef requestRef = new RequestRef();

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                NettyServerStreamRpc.this.metrics.onRequestCreate(NettyServerStreamRpcMetrics.RequestType.CHANNEL_READ);
                if (!(msg instanceof DataStreamRequestByteBuf)) {
                    LOG.error("Unexpected message class {}, ignoring ...", (Object)msg.getClass().getName());
                    return;
                }
                DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
                try (UncheckedAutoCloseable autoReset = this.requestRef.set(request);){
                    NettyServerStreamRpc.this.requests.read(request, ctx, NettyServerStreamRpc.this.proxies.get(request)::getDataStreamOutput);
                }
            }

            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
                Optional.ofNullable(this.requestRef.getAndSetNull()).ifPresent(request -> NettyServerStreamRpc.this.requests.replyDataStreamException(throwable, (DataStreamRequestByteBuf)request, ctx));
            }
        };
    }

    private ChannelInitializer<SocketChannel> newChannelInitializer(final SslContext sslContext) {
        return new ChannelInitializer<SocketChannel>(){

            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                if (sslContext != null) {
                    p.addLast("ssl", (ChannelHandler)sslContext.newHandler(ch.alloc()));
                }
                p.addLast(NettyServerStreamRpc.newDecoder());
                p.addLast(NettyServerStreamRpc.newEncoder());
                p.addLast(NettyServerStreamRpc.this.newChannelInboundHandlerAdapter());
            }
        };
    }

    static ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder(){
            {
                this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            @Override
            protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
                Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
            }
        };
    }

    static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamReplyByteBuffer>(){

            @Override
            protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add, context.alloc());
            }
        };
    }

    @Override
    public void start() {
        this.channelFuture.syncUninterruptibly();
    }

    @Override
    public InetSocketAddress getInetSocketAddress() {
        this.channelFuture.awaitUninterruptibly();
        return (InetSocketAddress)this.channelFuture.channel().localAddress();
    }

    @Override
    public void close() {
        try {
            this.channelFuture.channel().close().sync();
            this.bossGroup.shutdownGracefully(0L, 100L, TimeUnit.MILLISECONDS);
            this.workerGroup.shutdownGracefully(0L, 100L, TimeUnit.MILLISECONDS);
            ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, this.bossGroup, timeout -> LOG.warn("{}: bossGroup shutdown timeout in " + timeout, (Object)this));
            ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND, this.workerGroup, timeout -> LOG.warn("{}: workerGroup shutdown timeout in " + timeout, (Object)this));
        }
        catch (InterruptedException e) {
            LOG.error(this + ": Interrupted close()", e);
        }
        this.proxies.close();
    }

    public String toString() {
        return this.name;
    }

    static class RequestRef {
        private final AtomicReference<DataStreamRequestByteBuf> ref = new AtomicReference();

        RequestRef() {
        }

        UncheckedAutoCloseable set(DataStreamRequestByteBuf current) {
            DataStreamRequestByteBuf previous = this.ref.getAndUpdate(p -> p == null ? current : p);
            Preconditions.assertNull((Object)previous, () -> "previous = " + previous + " != null, current = " + current);
            return () -> Preconditions.assertSame(current, this.getAndSetNull(), "RequestRef");
        }

        DataStreamRequestByteBuf getAndSetNull() {
            return this.ref.getAndSet(null);
        }
    }

    static class ProxiesPool {
        private final List<Proxies> list;

        ProxiesPool(String name, RaftProperties properties, Parameters parameters) {
            int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties);
            ArrayList<Proxies> proxies = new ArrayList<Proxies>(clientPoolSize);
            for (int i = 0; i < clientPoolSize; ++i) {
                proxies.add(new Proxies(new PeerProxyMap<DataStreamClient>(name, peer -> NettyServerStreamRpc.newClient(peer, properties, parameters))));
            }
            this.list = Collections.unmodifiableList(proxies);
        }

        void addRaftPeers(Collection<RaftPeer> newPeers) {
            this.list.forEach(proxy -> proxy.addPeers(newPeers));
        }

        Proxies get(DataStreamPacket p) {
            long hash = Integer.toUnsignedLong(Objects.hash(p.getClientId(), p.getStreamId()));
            return this.list.get(Math.toIntExact(hash % (long)this.list.size()));
        }

        void close() {
            this.list.forEach(Proxies::close);
        }
    }

    static class Proxies {
        private final PeerProxyMap<DataStreamClient> map;

        Proxies(PeerProxyMap<DataStreamClient> map) {
            this.map = map;
        }

        void addPeers(Collection<RaftPeer> newPeers) {
            this.map.addRaftPeers(newPeers);
        }

        Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
            HashSet<DataStreamOutputRpc> outs = new HashSet<DataStreamOutputRpc>();
            try {
                this.getDataStreamOutput(request, peers, outs);
            }
            catch (IOException e) {
                outs.forEach(CloseAsync::closeAsync);
                throw e;
            }
            return outs;
        }

        private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs) throws IOException {
            for (RaftPeer peer : peers) {
                try {
                    outs.add((DataStreamOutputRpc)this.map.computeIfAbsent(peer).get().stream(request));
                }
                catch (IOException e) {
                    this.map.handleException(peer.getId(), e, true);
                    throw new IOException(this.map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
                }
            }
        }

        void close() {
            this.map.close();
        }
    }
}

