package com.hivemq.mqtt.handler.connect;

import com.google.common.annotations.VisibleForTesting;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.Message;
import com.hivemq.mqtt.message.auth.AUTH;
import com.hivemq.mqtt.message.connack.CONNACK;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.reason.Mqtt5ConnAckReasonCode;
import com.hivemq.util.ChannelAttributes;
import com.hivemq.util.ChannelUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hivemq/mqtt/handler/connect/MessageBarrier.class */
public class MessageBarrier extends ChannelDuplexHandler {
    private static final Logger log = LoggerFactory.getLogger(MessageBarrier.class);
    private static final ChannelFutureListener ENABLE_AUTO_READ_LISTENER = channelFuture -> {
        if (channelFuture.isSuccess()) {
            resumeRead(channelFuture.channel());
        }
    };

    @NotNull
    private final MqttServerDisconnector serverDisconnector;

    @NotNull
    private final Queue<Message> messageQueue = new LinkedList();
    private boolean connectReceived = false;
    private boolean connackSent = false;

    public MessageBarrier(@NotNull MqttServerDisconnector mqttServerDisconnector) {
        this.serverDisconnector = mqttServerDisconnector;
    }

    public void channelRead(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj) {
        if (obj instanceof Message) {
            if (obj instanceof CONNECT) {
                this.connectReceived = true;
                suspendRead(channelHandlerContext.channel());
            } else if (!this.connectReceived) {
                this.serverDisconnector.logAndClose(channelHandlerContext.channel(), "A client (IP: {}) sent other message before CONNECT. Disconnecting client.", "Sent other message before CONNECT");
                return;
            } else if (obj instanceof AUTH) {
                suspendRead(channelHandlerContext.channel());
            } else if (!this.connackSent) {
                this.messageQueue.add((Message) obj);
                return;
            }
        }
        channelHandlerContext.fireChannelRead(obj);
    }

    public void write(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj, @NotNull ChannelPromise channelPromise) {
        if ((obj instanceof CONNACK) && ((CONNACK) obj).getReasonCode() == Mqtt5ConnAckReasonCode.SUCCESS) {
            channelPromise.addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    channelFuture.channel().pipeline().remove(this);
                    this.connackSent = true;
                    releaseQueuedMessages(channelHandlerContext);
                }
            });
            channelPromise.addListener(ENABLE_AUTO_READ_LISTENER);
        } else if (obj instanceof AUTH) {
            channelPromise.addListener(ENABLE_AUTO_READ_LISTENER);
        }
        channelHandlerContext.write(obj, channelPromise);
    }

    private void releaseQueuedMessages(@NotNull ChannelHandlerContext channelHandlerContext) {
        Iterator<Message> it = this.messageQueue.iterator();
        while (it.hasNext()) {
            channelHandlerContext.fireChannelRead(it.next());
        }
    }

    private static void suspendRead(@NotNull Channel channel) {
        if (log.isTraceEnabled()) {
            log.trace("Suspending read operations for MQTT client with id {} and IP {}", ((ClientConnection) channel.attr(ChannelAttributes.CLIENT_CONNECTION).get()).getClientId(), ChannelUtils.getChannelIP(channel).or("UNKNOWN"));
        }
        channel.config().setAutoRead(false);
    }

    private static void resumeRead(@NotNull Channel channel) {
        if (log.isTraceEnabled()) {
            log.trace("Restarting read operations for MQTT client with id {} and IP {}", ((ClientConnection) channel.attr(ChannelAttributes.CLIENT_CONNECTION).get()).getClientId(), ChannelUtils.getChannelIP(channel).or("UNKNOWN"));
        }
        channel.config().setAutoRead(true);
    }

    @VisibleForTesting
    boolean getConnectReceived() {
        return this.connectReceived;
    }

    @VisibleForTesting
    @NotNull
    Collection<Message> getQueue() {
        return Collections.unmodifiableCollection(this.messageQueue);
    }
}
