/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.async.inter.distributed;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.compression.Lz4FrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.pipecraft.pipes.async.inter.distributed.ConnectionCounter;
import org.pipecraft.pipes.async.inter.distributed.NettyUtils;
import org.pipecraft.pipes.exceptions.PipeException;

class ShuffleServer
implements Closeable {
    private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final AttributeKey<ConnectionCounter> BYTE_SENT = AttributeKey.valueOf((String)"byte_sent");
    private final int port;
    private final ChannelInboundHandlerAdapter channelInboundHandler;
    private final EventLoopGroup workerGroup;
    private final CountDownLatch doneLatch;
    private final CheckedConsumer<byte[]> handler;
    private EventLoopGroup bossGroup;

    public ShuffleServer(int port, CountDownLatch doneLatch, ChannelInboundHandlerAdapter channelInboundHandler, EventLoopGroup workerGroup, CheckedConsumer<byte[]> handler) {
        this.port = port;
        this.channelInboundHandler = channelInboundHandler;
        this.workerGroup = workerGroup;
        this.doneLatch = doneLatch;
        this.handler = handler;
    }

    public void start() throws InterruptedException {
        this.bossGroup = NettyUtils.newEventLoopGroup(PROCESSORS);
        ServerBootstrap b = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(NettyUtils.getServerSocketChanneClass())).handler((ChannelHandler)new LoggingHandler(LogLevel.INFO))).childOption(ChannelOption.TCP_NODELAY, (Object)true).childOption(ChannelOption.SO_REUSEADDR, (Object)true).childOption(ChannelOption.SO_KEEPALIVE, (Object)true).childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.SO_SNDBUF, (Object)0x100000).childOption(ChannelOption.SO_RCVBUF, (Object)0x100000).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(131072, 655360)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new ChannelHandler[]{new Lz4FrameDecoder()});
                pipeline.addLast(new ChannelHandler[]{new MessageDecoder()});
                pipeline.addLast(new ChannelHandler[]{ShuffleServer.this.channelInboundHandler});
            }
        });
        b.bind(this.port).sync();
    }

    @Override
    public void close() {
        if (this.bossGroup.isShuttingDown()) {
            return;
        }
        this.bossGroup.shutdownGracefully(2L, 10L, TimeUnit.SECONDS).syncUninterruptibly();
    }

    private class MessageDecoder
    extends ByteToMessageDecoder {
        private MessageDecoder() {
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws IOException, PipeException, InterruptedException {
            Channel ch = ctx.channel();
            ConnectionCounter counter = (ConnectionCounter)ch.attr(BYTE_SENT).get();
            if (counter == null) {
                counter = new ConnectionCounter((InetSocketAddress)ch.remoteAddress());
                ch.attr(BYTE_SENT).set((Object)counter);
            }
            while (buf.readableBytes() >= 4) {
                buf.markReaderIndex();
                int dataLength = buf.readInt();
                if (dataLength < 0) {
                    if (dataLength == -1) {
                        if (buf.readableBytes() < 8) {
                            buf.resetReaderIndex();
                            return;
                        }
                        long totalSent = buf.readLong();
                        long counterValue = counter.getValue();
                        if (counterValue != totalSent) {
                            throw new RuntimeException("Got only " + counterValue + " but should've got " + totalSent + " from " + counter.getAddress());
                        }
                    } else {
                        ShuffleServer.this.doneLatch.countDown();
                    }
                    ctx.close();
                    return;
                }
                if (buf.readableBytes() < dataLength) {
                    buf.resetReaderIndex();
                    return;
                }
                byte[] msg = new byte[dataLength];
                buf.readBytes(msg);
                ShuffleServer.this.handler.accept(msg);
                counter.add(dataLength);
            }
        }
    }

    @FunctionalInterface
    public static interface CheckedConsumer<T> {
        public void accept(T var1) throws IOException, PipeException, InterruptedException;
    }
}

