package com.baidu.hugegraph.computer.core.network.netty;

import com.baidu.hugegraph.computer.core.common.exception.IllegalArgException;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.network.TransportHandler;
import com.baidu.hugegraph.computer.core.network.TransportUtil;
import com.baidu.hugegraph.computer.core.network.message.AckMessage;
import com.baidu.hugegraph.computer.core.network.message.DataMessage;
import com.baidu.hugegraph.computer.core.network.message.FailMessage;
import com.baidu.hugegraph.computer.core.network.message.FinishMessage;
import com.baidu.hugegraph.computer.core.network.message.Message;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.network.message.PingMessage;
import com.baidu.hugegraph.computer.core.network.message.PongMessage;
import com.baidu.hugegraph.computer.core.network.message.StartMessage;
import com.baidu.hugegraph.computer.core.network.session.TransportSession;
import com.baidu.hugegraph.util.Log;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/AbstractNettyHandler.class */
public abstract class AbstractNettyHandler extends SimpleChannelInboundHandler<Message> {
    private static final Logger LOG = Log.logger((Class<?>) AbstractNettyHandler.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
        Channel channel = channelHandlerContext.channel();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Receive remote message from '{}', message: {}", TransportUtil.remoteAddress(channel), message);
        }
        MessageType type = message.type();
        if (type.category() == MessageType.Category.DATA) {
            processDataMessage(channelHandlerContext, channel, (DataMessage) message);
            return;
        }
        switch (type) {
            case START:
                processStartMessage(channelHandlerContext, channel, (StartMessage) message);
                return;
            case FAIL:
                processFailMessage(channelHandlerContext, channel, (FailMessage) message);
                return;
            case ACK:
                processAckMessage(channelHandlerContext, channel, (AckMessage) message);
                return;
            case FINISH:
                processFinishMessage(channelHandlerContext, channel, (FinishMessage) message);
                return;
            case PING:
                processPingMessage(channelHandlerContext, channel, (PingMessage) message);
                return;
            case PONG:
                processPongMessage(channelHandlerContext, channel, (PongMessage) message);
                return;
            default:
                throw new IllegalArgException("Unknown message type: %s", type);
        }
    }

    protected abstract void processStartMessage(ChannelHandlerContext channelHandlerContext, Channel channel, StartMessage startMessage);

    protected abstract void processFinishMessage(ChannelHandlerContext channelHandlerContext, Channel channel, FinishMessage finishMessage);

    protected abstract void processDataMessage(ChannelHandlerContext channelHandlerContext, Channel channel, DataMessage dataMessage);

    protected abstract void processAckMessage(ChannelHandlerContext channelHandlerContext, Channel channel, AckMessage ackMessage);

    protected void processPingMessage(ChannelHandlerContext channelHandlerContext, Channel channel, PingMessage pingMessage) {
        channelHandlerContext.writeAndFlush(PongMessage.INSTANCE).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    protected void processPongMessage(ChannelHandlerContext channelHandlerContext, Channel channel, PongMessage pongMessage) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processFailMessage(ChannelHandlerContext channelHandlerContext, Channel channel, FailMessage failMessage) {
        transportHandler().exceptionCaught(new TransportException(failMessage.errorCode(), "Remote error from '%s', cause: %s", TransportUtil.remoteAddress(channel), failMessage.message()), TransportUtil.remoteConnectionId(channel));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ackFailMessage(ChannelHandlerContext channelHandlerContext, int i, int i2, String str) {
        channelHandlerContext.writeAndFlush(new FailMessage(i, i2, str)).awaitUninterruptibly(session().conf().writeSocketTimeout());
    }

    protected abstract TransportSession session();

    protected abstract TransportHandler transportHandler();
}
