package io.netty5.handler.flow;

import io.netty5.bootstrap.Bootstrap;
import io.netty5.bootstrap.ServerBootstrap;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelInitializer;
import io.netty5.channel.ChannelOption;
import io.netty5.channel.ChannelPipeline;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
import io.netty5.channel.MultithreadEventLoopGroup;
import io.netty5.channel.embedded.EmbeddedChannel;
import io.netty5.channel.nio.NioHandler;
import io.netty5.channel.socket.nio.NioServerSocketChannel;
import io.netty5.channel.socket.nio.NioSocketChannel;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.handler.timeout.IdleStateEvent;
import io.netty5.handler.timeout.IdleStateHandler;
import io.netty5.util.Resource;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/netty5/handler/flow/FlowControlHandlerTest.class */
public class FlowControlHandlerTest {
    private static EventLoopGroup eventLoopGroup;

    /* loaded from: input_file:io/netty5/handler/flow/FlowControlHandlerTest$OneByteToThreeStringsDecoder.class */
    private static final class OneByteToThreeStringsDecoder extends ByteToMessageDecoder {
        private OneByteToThreeStringsDecoder() {
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, Buffer buffer) {
            for (int i = 0; i < buffer.readableBytes(); i++) {
                channelHandlerContext.fireChannelRead("1");
                channelHandlerContext.fireChannelRead("2");
                channelHandlerContext.fireChannelRead("3");
            }
            buffer.skipReadableBytes(buffer.readableBytes());
        }
    }

    @BeforeAll
    public static void init() {
        eventLoopGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
    }

    @AfterAll
    public static void destroy() {
        eventLoopGroup.shutdownGracefully();
    }

    private static Buffer newOneMessage() {
        return DefaultBufferAllocators.preferredAllocator().allocate(1).writeByte((byte) 1);
    }

    private static Channel newServer(boolean z, final ChannelHandler... channelHandlerArr) throws Exception {
        Assertions.assertTrue(channelHandlerArr.length >= 1);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.AUTO_READ, Boolean.valueOf(z)).childHandler(new ChannelInitializer<Channel>() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.1
            protected void initChannel(Channel channel) {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast(new ChannelHandler[]{new OneByteToThreeStringsDecoder()});
                pipeline.addLast(channelHandlerArr);
            }
        });
        return (Channel) serverBootstrap.bind(0).asStage().get();
    }

    private static Channel newClient(SocketAddress socketAddress) throws Exception {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000).handler(new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.2
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                Assertions.fail("In this test the client is never receiving a message from the server.");
            }
        });
        return (Channel) bootstrap.connect(socketAddress).asStage().get();
    }

    @Test
    public void testAutoReadingOn() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        Channel newServer = newServer(true, new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.3
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                Resource.dispose(obj);
                channelHandlerContext.channel().setOption(ChannelOption.AUTO_READ, false);
                countDownLatch.countDown();
            }
        });
        Channel newClient = newClient(newServer.localAddress());
        try {
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }

    @Test
    public void testAutoReadingOff() throws Exception {
        final Exchanger exchanger = new Exchanger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        Channel newServer = newServer(false, new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.4
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
                channelHandlerContext.fireChannelActive();
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                Resource.dispose(obj);
                countDownLatch.countDown();
            }
        });
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            channel.read();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }

    @Test
    public void testFlowAutoReadOn() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final Exchanger exchanger = new Exchanger();
        ChannelHandler channelHandler = new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.5
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
                channelHandlerContext.fireChannelActive();
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                Resource.dispose(obj);
                countDownLatch.countDown();
            }
        };
        ChannelHandler flowControlHandler = new FlowControlHandler();
        Channel newServer = newServer(true, flowControlHandler, channelHandler);
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            EventLoop executor = channel.executor();
            Objects.requireNonNull(flowControlHandler);
            Assertions.assertTrue(((Boolean) executor.submit(flowControlHandler::isQueueEmpty).asStage().get()).booleanValue());
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }

    @Test
    public void testFlowToggleAutoRead() throws Exception {
        final Exchanger exchanger = new Exchanger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        final CountDownLatch countDownLatch5 = new CountDownLatch(1);
        ChannelHandler channelHandler = new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.6
            private int msgRcvCount;
            private int expectedMsgCount;

            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
                channelHandlerContext.fireChannelActive();
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws InterruptedException {
                Resource.dispose(obj);
                channelHandlerContext.channel().setOption(ChannelOption.AUTO_READ, false);
                int i = this.msgRcvCount;
                this.msgRcvCount = i + 1;
                if (i != this.expectedMsgCount) {
                    return;
                }
                switch (this.msgRcvCount) {
                    case 1:
                        countDownLatch.countDown();
                        if (countDownLatch4.await(1L, TimeUnit.SECONDS)) {
                            this.expectedMsgCount++;
                            return;
                        }
                        return;
                    case 2:
                        countDownLatch2.countDown();
                        if (countDownLatch5.await(1L, TimeUnit.SECONDS)) {
                            this.expectedMsgCount++;
                            return;
                        }
                        return;
                    default:
                        countDownLatch3.countDown();
                        return;
                }
            }
        };
        ChannelHandler flowControlHandler = new FlowControlHandler();
        Channel newServer = newServer(true, flowControlHandler, channelHandler);
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            channel.setOption(ChannelOption.AUTO_READ, true);
            countDownLatch4.countDown();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            channel.setOption(ChannelOption.AUTO_READ, true);
            countDownLatch5.countDown();
            Assertions.assertTrue(countDownLatch3.await(1L, TimeUnit.SECONDS));
            EventLoop executor = channel.executor();
            Objects.requireNonNull(flowControlHandler);
            Assertions.assertTrue(((Boolean) executor.submit(flowControlHandler::isQueueEmpty).asStage().get()).booleanValue());
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }

    @Test
    public void testFlowAutoReadOff() throws Exception {
        final Exchanger exchanger = new Exchanger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        final CountDownLatch countDownLatch3 = new CountDownLatch(3);
        ChannelHandler channelHandler = new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.7
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                channelHandlerContext.fireChannelActive();
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                countDownLatch.countDown();
                countDownLatch2.countDown();
                countDownLatch3.countDown();
            }
        };
        ChannelHandler flowControlHandler = new FlowControlHandler();
        Channel newServer = newServer(false, flowControlHandler, channelHandler);
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            channel.read();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            channel.read();
            Assertions.assertTrue(countDownLatch2.await(1L, TimeUnit.SECONDS));
            channel.read();
            Assertions.assertTrue(countDownLatch3.await(1L, TimeUnit.SECONDS));
            EventLoop executor = channel.executor();
            Objects.requireNonNull(flowControlHandler);
            Assertions.assertTrue(((Boolean) executor.submit(flowControlHandler::isQueueEmpty).asStage().get()).booleanValue());
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }

    @Test
    public void testReentranceNotCausesNPE() throws Throwable {
        final Exchanger exchanger = new Exchanger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        final AtomicReference atomicReference = new AtomicReference();
        ChannelHandler channelHandler = new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.8
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                channelHandlerContext.fireChannelActive();
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                countDownLatch.countDown();
                channelHandlerContext.read();
            }

            public void channelExceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
                atomicReference.set(th);
            }
        };
        ChannelHandler flowControlHandler = new FlowControlHandler();
        Channel newServer = newServer(false, flowControlHandler, channelHandler);
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            channel.read();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            EventLoop executor = channel.executor();
            Objects.requireNonNull(flowControlHandler);
            Assertions.assertTrue(((Boolean) executor.submit(flowControlHandler::isQueueEmpty).asStage().get()).booleanValue());
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                throw th;
            }
        } finally {
            newClient.close();
            newServer.close();
        }
    }

    @Test
    public void testSwallowedReadComplete() throws Exception {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(false, false, new ChannelHandler[]{new FlowControlHandler(), new IdleStateHandler(100L, 0L, 0L, TimeUnit.MILLISECONDS), new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.9
            public void channelActive(ChannelHandlerContext channelHandlerContext) {
                channelHandlerContext.fireChannelActive();
                channelHandlerContext.read();
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                channelHandlerContext.fireChannelRead(obj);
                channelHandlerContext.read();
            }

            public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
                channelHandlerContext.fireChannelReadComplete();
                channelHandlerContext.read();
            }

            public void channelInboundEvent(ChannelHandlerContext channelHandlerContext, Object obj) {
                if (obj instanceof IdleStateEvent) {
                    linkedBlockingQueue.add((IdleStateEvent) obj);
                }
                channelHandlerContext.fireChannelInboundEvent(obj);
            }
        }});
        embeddedChannel.setOption(ChannelOption.AUTO_READ, false);
        Assertions.assertFalse(((Boolean) embeddedChannel.getOption(ChannelOption.AUTO_READ)).booleanValue());
        embeddedChannel.register();
        Assertions.assertTrue(embeddedChannel.writeInbound(new Object[]{embeddedChannel.bufferAllocator().allocate(0)}));
        embeddedChannel.flushInbound();
        Buffer buffer = (Buffer) embeddedChannel.readInbound();
        try {
            org.assertj.core.api.Assertions.assertThat(buffer.readableBytes()).isZero();
            if (buffer != null) {
                buffer.close();
            }
            embeddedChannel.flushInbound();
            Assertions.assertNull(embeddedChannel.readInbound());
            Thread.sleep(120L);
            embeddedChannel.runPendingTasks();
            Assertions.assertEquals(IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT, linkedBlockingQueue.poll());
            Assertions.assertFalse(embeddedChannel.finish());
        } catch (Throwable th) {
            if (buffer != null) {
                try {
                    buffer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRemoveFlowControl() throws Exception {
        final Exchanger exchanger = new Exchanger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ChannelHandler channelHandler = new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.10
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                exchanger.exchange(channelHandlerContext.channel(), 1L, TimeUnit.SECONDS);
                channelHandlerContext.read();
                channelHandlerContext.fireChannelActive();
            }

            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                countDownLatch.countDown();
                channelHandlerContext.fireChannelRead(obj);
            }
        };
        ChannelHandler channelHandler2 = new FlowControlHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.11
            private int num;

            public void channelRead(final ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                super.channelRead(channelHandlerContext, obj);
                this.num++;
                if (this.num >= 3) {
                    channelHandlerContext.channel().executor().execute(new Runnable() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.11.1
                        @Override // java.lang.Runnable
                        public void run() {
                            channelHandlerContext.pipeline().remove(this);
                        }
                    });
                }
            }
        };
        Channel newServer = newServer(false, channelHandler2, channelHandler, new ChannelHandler() { // from class: io.netty5.handler.flow.FlowControlHandlerTest.12
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                Resource.dispose(obj);
            }
        });
        Channel newClient = newClient(newServer.localAddress());
        try {
            Channel channel = (Channel) exchanger.exchange(null, 1L, TimeUnit.SECONDS);
            newClient.writeAndFlush(newOneMessage()).asStage().sync();
            Assertions.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
            EventLoop executor = channel.executor();
            Objects.requireNonNull(channelHandler2);
            Assertions.assertTrue(((Boolean) executor.submit(channelHandler2::isQueueEmpty).asStage().get()).booleanValue());
            newClient.close();
            newServer.close();
        } catch (Throwable th) {
            newClient.close();
            newServer.close();
            throw th;
        }
    }
}
