/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.async.inter.distributed;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.compression.Lz4FrameEncoder;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.pipecraft.infra.io.Retrier;
import org.pipecraft.pipes.async.inter.distributed.ConnectionCounter;
import org.pipecraft.pipes.async.inter.distributed.NettyUtils;
import org.pipecraft.pipes.serialization.ByteArrayEncoder;

class ShuffleClient<T> {
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    private static final int SERVER_AVAILABILITY_MAX_WAIT_MS = 180000;
    private static final int MS_BETWEEN_RETRY = 1000;
    private static final Retrier RETRIER = new Retrier(1000, 1.0, 181);
    private static final byte[] CH_DONE_MESSAGE = new byte[0];
    private static final byte[] WORKER_DONE_MESSAGE = new byte[0];
    private static final AttributeKey<ConnectionCounter> BYTE_SENT = AttributeKey.valueOf((String)"byte_sent");
    private final String host;
    private final int port;
    private final ByteArrayEncoder<T> encoder;
    private final ThreadLocal<Channel> threadConnection;
    private final Bootstrap bootstrap;
    private final Map<Long, Channel> connections = new ConcurrentHashMap<Long, Channel>();

    public ShuffleClient(String host, int port, ByteArrayEncoder<T> encoder, final ChannelInboundHandlerAdapter channelInboundHandler, EventLoopGroup group) {
        this.host = host;
        this.port = port;
        this.encoder = encoder;
        this.bootstrap = new Bootstrap();
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(group)).channel(NettyUtils.getSocketChanneClass())).option(ChannelOption.TCP_NODELAY, (Object)true)).option(ChannelOption.SO_KEEPALIVE, (Object)true)).option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT)).option(ChannelOption.SO_SNDBUF, (Object)0x100000)).option(ChannelOption.SO_RCVBUF, (Object)0x100000)).option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(131072, 655360))).handler((ChannelHandler)new ChannelInitializer<Channel>(){

            protected void initChannel(Channel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new ChannelHandler[]{new Lz4FrameEncoder()});
                pipeline.addLast(new ChannelHandler[]{new FlushConsolidationHandler(256, true)});
                pipeline.addLast(new ChannelHandler[]{new BytesEncoder()});
                pipeline.addLast(new ChannelHandler[]{channelInboundHandler});
            }
        });
        this.threadConnection = ThreadLocal.withInitial(() -> {
            try {
                Channel ch = this.openNewConnection();
                this.connections.put(Thread.currentThread().getId(), ch);
                return ch;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
    }

    private Channel openNewConnection() throws IOException {
        try {
            return RETRIER.run(() -> this.bootstrap.connect(this.host, this.port).sync().channel());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
        catch (Exception e) {
            if (e instanceof ConnectException) {
                throw new IOException(e);
            }
            throw e;
        }
    }

    public void send(T msg) throws InterruptedException, IOException {
        try {
            Channel ch = this.threadConnection.get();
            if (!ch.isWritable()) {
                ch.writeAndFlush(msg).sync();
            } else {
                ch.writeAndFlush(msg, ch.voidPromise());
            }
        }
        catch (UncheckedIOException e) {
            throw new IOException(e);
        }
    }

    public void done() throws InterruptedException, IOException {
        for (Channel ch : this.connections.values()) {
            ch.writeAndFlush((Object)CH_DONE_MESSAGE).sync();
        }
        for (Channel ch : this.connections.values()) {
            ch.closeFuture().sync();
        }
        Channel ch = this.openNewConnection();
        ch.writeAndFlush((Object)WORKER_DONE_MESSAGE).sync();
        ch.closeFuture().sync();
        this.connections.clear();
    }

    private class BytesEncoder
    extends MessageToByteEncoder<T> {
        private BytesEncoder() {
        }

        protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws IOException {
            Channel ch = ctx.channel();
            ConnectionCounter counter = (ConnectionCounter)ch.attr(BYTE_SENT).get();
            if (counter == null) {
                counter = new ConnectionCounter((InetSocketAddress)ch.remoteAddress());
                ch.attr(BYTE_SENT).set((Object)counter);
            }
            if (msg == CH_DONE_MESSAGE) {
                out.writeInt(-1);
                out.writeLong(counter.getValue());
                return;
            }
            if (msg == WORKER_DONE_MESSAGE) {
                out.writeInt(-2);
                return;
            }
            int startIndex = out.writerIndex();
            out.writeBytes(LENGTH_PLACEHOLDER);
            out.writeBytes(ShuffleClient.this.encoder.encode(msg));
            int messageLength = out.writerIndex() - startIndex - LENGTH_PLACEHOLDER.length;
            out.setInt(startIndex, messageLength);
            counter.add(messageLength);
        }
    }
}

