package com.baidu.hugegraph.computer.core.network.netty;

import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.network.ClientHandler;
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.TransportClient;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.computer.core.network.TransportState;
import com.baidu.hugegraph.computer.core.network.message.Message;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.network.session.ClientSession;
import com.baidu.hugegraph.util.E;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/NettyTransportClient.class */
public class NettyTransportClient implements TransportClient {
    private final Channel channel;
    private final ConnectionId connectionId;
    private final NettyClientFactory clientFactory;
    private final ClientHandler handler;
    private final ClientSession session;
    private final long timeoutSyncRequest;
    private final long timeoutFinishSession;
    private volatile boolean preSendAvailable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/NettyTransportClient$ClientChannelListenerOnWrite.class */
    public class ClientChannelListenerOnWrite extends ChannelFutureListenerOnWrite {
        ClientChannelListenerOnWrite() {
            super(NettyTransportClient.this.handler);
        }

        @Override // com.baidu.hugegraph.computer.core.network.netty.ChannelFutureListenerOnWrite
        public void onSuccess(Channel channel, ChannelFuture channelFuture) {
            super.onSuccess(channel, channelFuture);
            NettyTransportClient.this.checkAndNotifySendAvailable();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NettyTransportClient(Channel channel, ConnectionId connectionId, NettyClientFactory nettyClientFactory, ClientHandler clientHandler) {
        E.checkArgumentNotNull(clientHandler, "The clientHandler parameter can't be null", new Object[0]);
        initChannel(channel, connectionId, nettyClientFactory.protocol(), clientHandler);
        this.channel = channel;
        this.connectionId = connectionId;
        this.clientFactory = nettyClientFactory;
        this.handler = clientHandler;
        TransportConf conf = this.clientFactory.conf();
        this.timeoutSyncRequest = conf.timeoutSyncRequest();
        this.timeoutFinishSession = conf.timeoutFinishSession();
        this.session = new ClientSession(conf, createSendFunction());
        this.preSendAvailable = false;
    }

    public Channel channel() {
        return this.channel;
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public ConnectionId connectionId() {
        return this.connectionId;
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress) this.channel.remoteAddress();
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public boolean active() {
        return this.channel.isActive();
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public boolean sessionActive() {
        if (!active()) {
            return false;
        }
        TransportState state = this.session.state();
        return state == TransportState.ESTABLISHED || state == TransportState.FINISH_SENT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientSession clientSession() {
        return this.session;
    }

    public ClientHandler clientHandler() {
        return this.handler;
    }

    private void initChannel(Channel channel, ConnectionId connectionId, NettyProtocol nettyProtocol, ClientHandler clientHandler) {
        nettyProtocol.replaceClientHandler(channel, this);
        clientHandler.onChannelActive(connectionId);
    }

    private Function<Message, Future<Void>> createSendFunction() {
        ClientChannelListenerOnWrite clientChannelListenerOnWrite = new ClientChannelListenerOnWrite();
        return message -> {
            return this.channel.writeAndFlush(message).addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) clientChannelListenerOnWrite);
        };
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public CompletableFuture<Void> startSessionAsync() {
        return this.session.startAsync();
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public void startSession() throws TransportException {
        startSession(this.timeoutSyncRequest);
    }

    private void startSession(long j) throws TransportException {
        this.session.start(j);
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public boolean send(MessageType messageType, int i, ByteBuffer byteBuffer) throws TransportException {
        if (!checkSendAvailable()) {
            return false;
        }
        this.session.sendAsync(messageType, i, byteBuffer);
        return true;
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public CompletableFuture<Void> finishSessionAsync() {
        return this.session.finishAsync();
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public void finishSession() throws TransportException {
        finishSession(this.timeoutFinishSession);
    }

    private void finishSession(long j) throws TransportException {
        this.session.finish(j);
    }

    protected boolean checkSendAvailable() {
        return !this.session.flowBlocking() && this.channel.isWritable();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkAndNotifySendAvailable() {
        boolean checkSendAvailable = checkSendAvailable();
        if (checkSendAvailable && !this.preSendAvailable) {
            this.handler.sendAvailable(this.connectionId);
        }
        this.preSendAvailable = checkSendAvailable;
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportClient
    public void close() {
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly(this.clientFactory.conf().closeTimeout(), TimeUnit.MILLISECONDS);
        }
    }
}
