package io.simplesource.kafka.internal.cluster;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.AbstractChannelPoolHandler;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.FixedChannelPool;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/kafka/internal/cluster/ClientImpl.class */
public class ClientImpl implements Client {
    private static final Logger logger = LoggerFactory.getLogger(ClientImpl.class);
    private final ClusterConfig clusterConfig;
    private final EventLoopGroup workGroup;
    private final Class channelClass;
    private final PipelineInitializer pipelineInitializer;
    private final AbstractChannelPoolMap<HostInfo, ChannelPool> poolMap = createPoolMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientImpl(ClusterConfig clusterConfig, EventLoopGroup eventLoopGroup, Class cls, PipelineInitializer pipelineInitializer) {
        this.clusterConfig = clusterConfig;
        this.workGroup = eventLoopGroup;
        this.channelClass = cls;
        this.pipelineInitializer = pipelineInitializer;
    }

    private AbstractChannelPoolMap<HostInfo, ChannelPool> createPoolMap() {
        return new AbstractChannelPoolMap<HostInfo, ChannelPool>() { // from class: io.simplesource.kafka.internal.cluster.ClientImpl.1
            /* JADX INFO: Access modifiers changed from: protected */
            public ChannelPool newPool(HostInfo hostInfo) {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(ClientImpl.this.workGroup);
                bootstrap.channel(ClientImpl.this.channelClass);
                bootstrap.remoteAddress(hostInfo.host(), hostInfo.port());
                return new FixedChannelPool(bootstrap, new AbstractChannelPoolHandler() { // from class: io.simplesource.kafka.internal.cluster.ClientImpl.1.1
                    public void channelCreated(Channel channel) {
                        ClientImpl.this.pipelineInitializer.init(channel.pipeline());
                    }
                }, ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, ClientImpl.this.clusterConfig.acquireTimeoutMillis(), ClientImpl.this.clusterConfig.maxConnectionsPerHost(), ClientImpl.this.clusterConfig.maxPendingAcquires(), true);
            }
        };
    }

    @Override // io.simplesource.kafka.internal.cluster.Client
    public void send(HostInfo hostInfo, Message message) {
        logger.trace("Sending message:{} to {}", message, hostInfo);
        ChannelPool channelPool = this.poolMap.get(hostInfo);
        channelPool.acquire().addListener(future -> {
            if (!future.isSuccess()) {
                logger.error("Failed to connect to remote system:" + hostInfo, future.cause());
                return;
            }
            Channel channel = (Channel) future.getNow();
            channel.writeAndFlush(message);
            channelPool.release(channel);
        });
    }

    public void stop() {
        this.poolMap.close();
    }
}
