package com.github.pgasync.impl.netty;

import com.github.pgasync.SqlException;
import com.github.pgasync.impl.PgProtocolStream;
import com.github.pgasync.impl.message.Authentication;
import com.github.pgasync.impl.message.Message;
import com.github.pgasync.impl.message.ReadyForQuery;
import com.github.pgasync.impl.message.StartupMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;

/* loaded from: input_file:com/github/pgasync/impl/netty/NettyPgProtocolStream.class */
public class NettyPgProtocolStream implements PgProtocolStream {
    final SocketAddress address;
    final EventLoopGroup group;
    ChannelHandlerContext ctx;
    volatile Consumer<Message> onReceive;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/pgasync/impl/netty/NettyPgProtocolStream$ChannelError.class */
    public static class ChannelError extends SqlException implements Message {
        ChannelError(String str) {
            super(str);
        }

        public ChannelError(Throwable th) {
            super(th);
        }
    }

    public NettyPgProtocolStream(SocketAddress socketAddress, EventLoopGroup eventLoopGroup) {
        this.address = socketAddress;
        this.group = eventLoopGroup;
    }

    @Override // com.github.pgasync.impl.PgProtocolStream
    public void connect(final StartupMessage startupMessage, final Consumer<List<Message>> consumer) {
        new Bootstrap().group(this.group).channel(NioSocketChannel.class).handler(newProtocolInitializer(new ChannelInboundHandlerAdapter() { // from class: com.github.pgasync.impl.netty.NettyPgProtocolStream.1
            public void channelActive(ChannelHandlerContext channelHandlerContext) {
                NettyPgProtocolStream.this.ctx = channelHandlerContext;
                NettyPgProtocolStream.this.onReceive = NettyPgProtocolStream.this.newReplyHandler(consumer);
                channelHandlerContext.writeAndFlush(startupMessage);
            }
        })).connect(this.address).addListener(future -> {
            if (future.isSuccess()) {
                return;
            }
            consumer.accept(Arrays.asList(new ChannelError(future.cause())));
        });
    }

    @Override // com.github.pgasync.impl.PgProtocolStream
    public void send(Message message, Consumer<List<Message>> consumer) {
        if (!isConnected()) {
            throw new IllegalStateException("Channel is closed");
        }
        this.onReceive = newReplyHandler(consumer);
        this.ctx.writeAndFlush(message);
    }

    @Override // com.github.pgasync.impl.PgProtocolStream
    public void send(List<Message> list, Consumer<List<Message>> consumer) {
        if (!isConnected()) {
            throw new IllegalStateException("Channel is closed");
        }
        this.onReceive = newReplyHandler(consumer);
        ChannelHandlerContext channelHandlerContext = this.ctx;
        channelHandlerContext.getClass();
        list.forEach((v1) -> {
            r1.write(v1);
        });
        this.ctx.flush();
    }

    @Override // com.github.pgasync.impl.PgProtocolStream
    public boolean isConnected() {
        return this.ctx.channel().isOpen();
    }

    @Override // com.github.pgasync.impl.PgProtocolStream
    public void close() {
        this.ctx.close();
    }

    Consumer<Message> newReplyHandler(Consumer<List<Message>> consumer) {
        ArrayList arrayList = new ArrayList();
        return message -> {
            arrayList.add(message);
            if ((message instanceof ReadyForQuery) || (message instanceof ChannelError) || ((message instanceof Authentication) && !((Authentication) message).isAuthenticationOk())) {
                this.onReceive = null;
                consumer.accept(arrayList);
            }
        };
    }

    ChannelInitializer<Channel> newProtocolInitializer(final ChannelHandler channelHandler) {
        return new ChannelInitializer<Channel>() { // from class: com.github.pgasync.impl.netty.NettyPgProtocolStream.2
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast("frame-decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 1, 4, -4, 0, true));
                channel.pipeline().addLast("message-decoder", new ByteBufMessageDecoder());
                channel.pipeline().addLast("message-encoder", new ByteBufMessageEncoder());
                channel.pipeline().addLast("message-handler", NettyPgProtocolStream.this.newProtocolHandler());
                channel.pipeline().addLast(new ChannelHandler[]{channelHandler});
            }
        };
    }

    ChannelHandler newProtocolHandler() {
        return new ChannelInboundHandlerAdapter() { // from class: com.github.pgasync.impl.netty.NettyPgProtocolStream.3
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                NettyPgProtocolStream.this.onReceive.accept((Message) obj);
            }

            public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                if (NettyPgProtocolStream.this.onReceive != null) {
                    NettyPgProtocolStream.this.onReceive.accept(new ChannelError("Channel state changed to inactive"));
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                if (NettyPgProtocolStream.this.onReceive != null) {
                    NettyPgProtocolStream.this.onReceive.accept(new ChannelError(th));
                }
            }
        };
    }
}
