package com.baidu.hugegraph.computer.core.network.netty;

import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.network.ClientFactory;
import com.baidu.hugegraph.computer.core.network.ClientHandler;
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.TransportClient;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.computer.core.network.TransportUtil;
import com.baidu.hugegraph.structure.constant.Traverser;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/NettyClientFactory.class */
public class NettyClientFactory implements ClientFactory {
    private static final Logger LOG = Log.logger((Class<?>) NettyClientFactory.class);
    private final TransportConf conf;
    private final ByteBufAllocator bufAllocator;
    private final NettyProtocol protocol;
    private EventLoopGroup workerGroup;
    private Bootstrap bootstrap;
    private int connectTimeoutMs;

    public NettyClientFactory(TransportConf transportConf) {
        this(transportConf, BufAllocatorFactory.createBufAllocator());
    }

    public NettyClientFactory(TransportConf transportConf, ByteBufAllocator byteBufAllocator) {
        this.conf = transportConf;
        this.bufAllocator = byteBufAllocator;
        this.protocol = new NettyProtocol(this.conf);
    }

    @Override // com.baidu.hugegraph.computer.core.network.ClientFactory
    public synchronized void init() {
        E.checkArgument(this.bootstrap == null, "The NettyClientFactory has already been initialized", new Object[0]);
        this.connectTimeoutMs = Math.toIntExact(this.conf.clientConnectionTimeout());
        this.workerGroup = NettyEventLoopUtil.createEventLoop(this.conf.ioMode(), this.conf.clientThreads(), TransportConf.CLIENT_THREAD_GROUP_NAME);
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(this.workerGroup);
        this.bootstrap.channel(NettyEventLoopUtil.clientChannelClass(this.conf.ioMode()));
        this.bootstrap.option(ChannelOption.ALLOCATOR, this.bufAllocator);
        this.bootstrap.option(ChannelOption.TCP_NODELAY, true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(this.conf.tcpKeepAlive()));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.connectTimeoutMs));
        if (this.conf.sizeReceiveBuffer() > 0) {
            this.bootstrap.option(ChannelOption.SO_RCVBUF, Integer.valueOf(this.conf.sizeReceiveBuffer()));
        }
        if (this.conf.sizeSendBuffer() > 0) {
            this.bootstrap.option(ChannelOption.SO_SNDBUF, Integer.valueOf(this.conf.sizeSendBuffer()));
        }
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(this.conf.writeBufferLowMark(), this.conf.writeBufferHighMark()));
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: com.baidu.hugegraph.computer.core.network.netty.NettyClientFactory.1
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) {
                NettyClientFactory.this.protocol.initializeClientPipeline(socketChannel);
            }
        });
    }

    @Override // com.baidu.hugegraph.computer.core.network.ClientFactory
    public TransportClient createClient(ConnectionId connectionId, ClientHandler clientHandler) throws TransportException {
        InetSocketAddress socketAddress = connectionId.socketAddress();
        LOG.debug("Creating new client connection to '{}'", connectionId);
        NettyTransportClient nettyTransportClient = new NettyTransportClient(doConnectWithRetries(socketAddress, this.conf.networkRetries(), this.connectTimeoutMs), connectionId, this, clientHandler);
        LOG.debug("Successfully created a new client to '{}'", connectionId);
        return nettyTransportClient;
    }

    protected Channel doConnect(InetSocketAddress inetSocketAddress, int i) throws TransportException {
        E.checkArgumentNotNull(this.bootstrap, "The NettyClientFactory has not been initialized yet", new Object[0]);
        long nanoTime = System.nanoTime();
        String formatAddress = TransportUtil.formatAddress(inetSocketAddress);
        LOG.debug("ConnectTimeout of address [{}] is [{}]", formatAddress, Integer.valueOf(i));
        ChannelFuture connect = this.bootstrap.connect(inetSocketAddress);
        boolean awaitUninterruptibly = connect.awaitUninterruptibly(i, TimeUnit.MILLISECONDS);
        if (!connect.isDone()) {
            throw new TransportException("Create connection to '%s' timeout!", formatAddress);
        }
        if (connect.isCancelled()) {
            throw new TransportException("Create connection to '%s' cancelled by user!", formatAddress);
        }
        if (connect.cause() != null) {
            throw new TransportException("Failed to create connection to '%s', caused by: %s", connect.cause(), formatAddress, connect.cause().getMessage());
        }
        if (!awaitUninterruptibly || !connect.isSuccess()) {
            throw new TransportException("Failed to create connection to '%s'", formatAddress);
        }
        LOG.info("Successfully created connection to '{}' after {} ms", formatAddress, Long.valueOf((System.nanoTime() - nanoTime) / Traverser.DEFAULT_DEDUP_SIZE));
        return connect.channel();
    }

    protected Channel doConnectWithRetries(InetSocketAddress inetSocketAddress, int i, int i2) throws TransportException {
        String formatAddress = TransportUtil.formatAddress(inetSocketAddress);
        int i3 = 0;
        while (true) {
            try {
                return doConnect(inetSocketAddress, i2);
            } catch (IOException e) {
                i3++;
                if (i3 > i) {
                    LOG.warn("Failed to connect to '{}', Giving up after {} retries", formatAddress, Integer.valueOf(i), e);
                    throw e;
                }
                LOG.debug("Failed to connect to '{}' with retries times {}, Retrying...", formatAddress, Integer.valueOf(i3), e);
            }
        }
    }

    public TransportConf conf() {
        return this.conf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NettyProtocol protocol() {
        return this.protocol;
    }

    @Override // com.baidu.hugegraph.computer.core.network.ClientFactory
    public void close() {
        if (this.workerGroup != null && !this.workerGroup.isShuttingDown()) {
            this.workerGroup.shutdownGracefully();
            this.workerGroup = null;
        }
        this.bootstrap = null;
    }
}
