package com.hivemq.mqtt.handler.publish;

import com.google.common.annotations.VisibleForTesting;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientConnectionContext;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extensions.handler.IncomingPublishHandler;
import com.hivemq.mqtt.event.PublishDroppedEvent;
import com.hivemq.mqtt.event.PubrelDroppedEvent;
import com.hivemq.mqtt.message.MessageWithID;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.pubcomp.PUBCOMP;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.pubrec.PUBREC;
import com.hivemq.mqtt.message.pubrel.PUBREL;
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.qos.IncomingMessageFlowPersistence;
import com.hivemq.persistence.util.FutureUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hivemq/mqtt/handler/publish/PublishFlowHandler.class */
public class PublishFlowHandler extends ChannelDuplexHandler {

    @NotNull
    private static final Logger log = LoggerFactory.getLogger(PublishFlowHandler.class);

    @NotNull
    private static final AtomicLong UNACKNOWLEDGED_PUBLISHES_COUNTER = new AtomicLong();

    @NotNull
    private final IncomingMessageFlowPersistence persistence;

    @NotNull
    private final OrderedTopicService orderedTopicService;

    @NotNull
    private final PublishPollService publishPollService;

    @NotNull
    private final IncomingPublishHandler incomingPublishHandler;

    @NotNull
    private final DropOutgoingPublishesHandler dropOutgoingPublishesHandler;

    @NotNull
    private final Map<Integer, Boolean> qos1And2AlreadySentMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/mqtt/handler/publish/PublishFlowHandler$PUBLISHFlowCompleteListener.class */
    public static class PUBLISHFlowCompleteListener implements ChannelFutureListener {
        private final int messageId;

        @NotNull
        private final String client;

        @NotNull
        private final Map<Integer, Boolean> qos1And2AlreadySentMap;

        @NotNull
        private final IncomingMessageFlowPersistence persistence;

        PUBLISHFlowCompleteListener(int i, @NotNull String str, @NotNull Map<Integer, Boolean> map, @NotNull IncomingMessageFlowPersistence incomingMessageFlowPersistence) {
            this.messageId = i;
            this.client = str;
            this.qos1And2AlreadySentMap = map;
            this.persistence = incomingMessageFlowPersistence;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                PublishFlowHandler.UNACKNOWLEDGED_PUBLISHES_COUNTER.decrementAndGet();
                this.qos1And2AlreadySentMap.remove(Integer.valueOf(this.messageId));
                this.persistence.remove(this.client, this.messageId);
                PublishFlowHandler.log.trace("Client '{}' completed a PUBLISH flow with QoS 1 or 2 for packet identifier '{}'", this.client, Integer.valueOf(this.messageId));
            }
        }
    }

    @Inject
    @VisibleForTesting
    public PublishFlowHandler(@NotNull PublishPollService publishPollService, @NotNull IncomingMessageFlowPersistence incomingMessageFlowPersistence, @NotNull OrderedTopicService orderedTopicService, @NotNull IncomingPublishHandler incomingPublishHandler, @NotNull DropOutgoingPublishesHandler dropOutgoingPublishesHandler) {
        this.publishPollService = publishPollService;
        this.persistence = incomingMessageFlowPersistence;
        this.orderedTopicService = orderedTopicService;
        this.incomingPublishHandler = incomingPublishHandler;
        this.dropOutgoingPublishesHandler = dropOutgoingPublishesHandler;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, @NotNull Object obj) throws Exception {
        if (obj instanceof PUBLISH) {
            handlePublish(channelHandlerContext, (PUBLISH) obj);
            return;
        }
        if (obj instanceof PUBACK) {
            handlePuback(channelHandlerContext, (PUBACK) obj);
            return;
        }
        if (obj instanceof PUBREC) {
            handlePubrec(channelHandlerContext, (PUBREC) obj);
            return;
        }
        if (obj instanceof PUBREL) {
            handlePubrel(channelHandlerContext, (PUBREL) obj);
        } else if (obj instanceof PUBCOMP) {
            handlePubcomp(channelHandlerContext, (PUBCOMP) obj);
        } else {
            super.channelRead(channelHandlerContext, obj);
        }
    }

    public void write(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj, @NotNull ChannelPromise channelPromise) throws Exception {
        if (!(obj instanceof PUBLISH) && !(obj instanceof PUBACK) && !(obj instanceof PUBREL)) {
            super.write(channelHandlerContext, obj, channelPromise);
            return;
        }
        if (obj instanceof PUBACK) {
            PUBACK puback = (PUBACK) obj;
            String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
            int packetIdentifier = puback.getPacketIdentifier();
            this.persistence.addOrReplace(clientId, packetIdentifier, puback);
            channelPromise.addListener(new PUBLISHFlowCompleteListener(packetIdentifier, clientId, this.qos1And2AlreadySentMap, this.persistence));
        }
        if (this.orderedTopicService.handlePublish(channelHandlerContext.channel(), obj, channelPromise) || this.dropOutgoingPublishesHandler.checkChannelNotWritable(channelHandlerContext, obj, channelPromise)) {
            return;
        }
        super.write(channelHandlerContext, obj, channelPromise);
    }

    public void userEventTriggered(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj) throws Exception {
        if (obj instanceof PublishDroppedEvent) {
            this.orderedTopicService.messageFlowComplete(channelHandlerContext, ((PublishDroppedEvent) obj).getMessage().getPacketIdentifier());
        } else if (obj instanceof PubrelDroppedEvent) {
            this.orderedTopicService.messageFlowComplete(channelHandlerContext, ((PubrelDroppedEvent) obj).getMessage().getPacketIdentifier());
        } else {
            super.userEventTriggered(channelHandlerContext, obj);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        String clientId;
        this.orderedTopicService.handleInactive();
        ClientConnectionContext of = ClientConnectionContext.of(channelHandlerContext.channel());
        Long clientSessionExpiryInterval = of.getClientSessionExpiryInterval();
        if (clientSessionExpiryInterval != null && clientSessionExpiryInterval.longValue() == 0 && (clientId = of.getClientId()) != null) {
            this.persistence.delete(clientId);
        }
        super.channelInactive(channelHandlerContext);
    }

    private void handlePublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish) throws Exception {
        String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
        if (publish.getQoS() == QoS.AT_MOST_ONCE) {
            this.incomingPublishHandler.interceptOrDelegate(channelHandlerContext, publish, clientId);
            return;
        }
        UNACKNOWLEDGED_PUBLISHES_COUNTER.incrementAndGet();
        int packetIdentifier = publish.getPacketIdentifier();
        if (!(this.persistence.get(clientId, packetIdentifier) instanceof PUBLISH)) {
            firstPublishForMessageIdReceived(channelHandlerContext, publish, clientId, packetIdentifier);
        } else if (publish.isDuplicateDelivery()) {
            resentWithDUPFlag(channelHandlerContext, publish, clientId);
        } else {
            resentWithoutDUPFlag(channelHandlerContext, publish, clientId);
        }
    }

    private void firstPublishForMessageIdReceived(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @NotNull String str, int i) throws Exception {
        this.persistence.addOrReplace(str, i, publish);
        this.incomingPublishHandler.interceptOrDelegate(channelHandlerContext, publish, str);
        this.qos1And2AlreadySentMap.put(Integer.valueOf(i), true);
        log.trace("Client {} sent a publish message with id {} which was not forwarded before. This message is processed normally", str, Integer.valueOf(i));
    }

    private void resentWithDUPFlag(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @NotNull String str) throws Exception {
        Boolean bool = this.qos1And2AlreadySentMap.get(Integer.valueOf(publish.getPacketIdentifier()));
        if (bool == null || !bool.booleanValue()) {
            super.channelRead(channelHandlerContext, publish);
            log.debug("Client {} sent a duplicate publish message with id {} which was not forwarded before. This message is processed normally", str, Integer.valueOf(publish.getPacketIdentifier()));
        } else {
            log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored", str, Integer.valueOf(publish.getPacketIdentifier()));
        }
        this.qos1And2AlreadySentMap.put(Integer.valueOf(publish.getPacketIdentifier()), true);
    }

    private void resentWithoutDUPFlag(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @NotNull String str) throws Exception {
        log.debug("Client {} sent a new PUBLISH with QoS {} and a message identifier which is already in process ({}) by another flow! Starting new flow", new Object[]{str, Integer.valueOf(publish.getQoS().getQosNumber()), Integer.valueOf(publish.getPacketIdentifier())});
        this.persistence.addOrReplace(str, publish.getPacketIdentifier(), publish);
        this.incomingPublishHandler.interceptOrDelegate(channelHandlerContext, publish, str);
        this.qos1And2AlreadySentMap.put(Integer.valueOf(publish.getPacketIdentifier()), true);
    }

    private void handlePuback(@NotNull ChannelHandlerContext channelHandlerContext, PUBACK puback) {
        String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
        log.trace("Client {}: Received PUBACK", clientId);
        int packetIdentifier = puback.getPacketIdentifier();
        this.orderedTopicService.messageFlowComplete(channelHandlerContext, packetIdentifier);
        returnMessageId(channelHandlerContext.channel(), puback, clientId);
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBACK remove message id:[{}] ", clientId, Integer.valueOf(packetIdentifier));
        }
    }

    private void handlePubrec(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBREC pubrec) {
        String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
        log.trace("Client {}: Received pubrec", clientId);
        if (pubrec.getReasonCode() != Mqtt5PubRecReasonCode.SUCCESS && pubrec.getReasonCode() != Mqtt5PubRecReasonCode.NO_MATCHING_SUBSCRIBERS) {
            this.orderedTopicService.messageFlowComplete(channelHandlerContext, pubrec.getPacketIdentifier());
        }
        FutureUtils.addExceptionLogger(this.publishPollService.putPubrelInQueue(clientId, pubrec.getPacketIdentifier()));
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBREC remove message id:[{}]", clientId, Integer.valueOf(pubrec.getPacketIdentifier()));
        }
        channelHandlerContext.channel().writeAndFlush(new PUBREL(pubrec.getPacketIdentifier()));
    }

    private void handlePubrel(ChannelHandlerContext channelHandlerContext, PUBREL pubrel) {
        String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
        int packetIdentifier = pubrel.getPacketIdentifier();
        this.persistence.addOrReplace(clientId, packetIdentifier, pubrel);
        channelHandlerContext.writeAndFlush(new PUBCOMP(packetIdentifier)).addListener(new PUBLISHFlowCompleteListener(packetIdentifier, clientId, this.qos1And2AlreadySentMap, this.persistence));
    }

    private void handlePubcomp(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBCOMP pubcomp) {
        String clientId = ClientConnection.of(channelHandlerContext.channel()).getClientId();
        log.trace("Client {}: Received PUBCOMP", clientId);
        this.orderedTopicService.messageFlowComplete(channelHandlerContext, pubcomp.getPacketIdentifier());
        returnMessageId(channelHandlerContext.channel(), pubcomp, clientId);
        if (log.isTraceEnabled()) {
            log.trace("Client {}: Received PUBCOMP remove message id:[{}]", clientId, Integer.valueOf(pubcomp.getPacketIdentifier()));
        }
    }

    private void returnMessageId(@NotNull Channel channel, @NotNull MessageWithID messageWithID, @NotNull String str) {
        int packetIdentifier = messageWithID.getPacketIdentifier();
        if (packetIdentifier > 0) {
            ClientConnection.of(channel).getMessageIDPool().returnId(packetIdentifier);
            if (log.isTraceEnabled()) {
                log.trace("Returning Message ID {} for client {} because of a {} message was received", new Object[]{Integer.valueOf(packetIdentifier), str, messageWithID.getClass().getSimpleName()});
            }
        }
    }
}
