package com.hivemq.client.internal.mqtt.handler.publish.outgoing;

import com.hivemq.client.internal.annotations.CallByThread;
import com.hivemq.client.internal.logging.InternalLogger;
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.advanced.interceptor.MqttClientInterceptors;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPubRelWithFlow;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult;
import com.hivemq.client.internal.mqtt.message.publish.puback.MqttPubAck;
import com.hivemq.client.internal.mqtt.message.publish.pubcomp.MqttPubComp;
import com.hivemq.client.internal.mqtt.message.publish.pubrec.MqttPubRec;
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRel;
import com.hivemq.client.internal.mqtt.message.publish.pubrel.MqttPubRelBuilder;
import com.hivemq.client.internal.util.Ranges;
import com.hivemq.client.internal.util.collections.ChunkedIntArrayQueue;
import com.hivemq.client.internal.util.collections.IntMap;
import com.hivemq.client.internal.util.netty.ContextFuture;
import com.hivemq.client.internal.util.netty.DefaultContextPromise;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos1.Mqtt5OutgoingQos1Interceptor;
import com.hivemq.client.mqtt.mqtt5.advanced.interceptor.qos2.Mqtt5OutgoingQos2Interceptor;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode;
import com.hivemq.shaded.io.netty.channel.ChannelHandlerContext;
import com.hivemq.shaded.io.netty.util.concurrent.Future;
import com.hivemq.shaded.io.netty.util.concurrent.GenericFutureListener;
import com.hivemq.shaded.javax.inject.Inject;
import com.hivemq.shaded.org.jctools.queues.SpscUnboundedArrayQueue;
import com.hivemq.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.shaded.org.jetbrains.annotations.Nullable;
import io.reactivex.FlowableSubscriber;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscription;

@ClientScope
/* loaded from: input_file:com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.class */
public class MqttOutgoingQosHandler extends MqttSessionAwareHandler implements FlowableSubscriber<MqttPublishWithFlow>, Runnable, ContextFuture.Listener<MqttPublishWithFlow> {

    @NotNull
    public static final String NAME = "qos.outgoing";

    @NotNull
    private static final InternalLogger LOGGER;
    private static final int MAX_CONCURRENT_PUBLISH_FLOWABLES = 64;
    private static final boolean QOS_2_COMPLETE_RESULT = false;

    @NotNull
    private final MqttClientConfig clientConfig;

    @NotNull
    private final MqttPublishFlowables publishFlowables;
    private int sendMaximum;

    @Nullable
    private Ranges packetIdentifiers;

    @Nullable
    private IntMap<MqttPubOrRelWithFlow> qos1Or2Map;

    @Nullable
    private MqttTopicAliasMapping topicAliasMapping;
    private int shrinkIds;
    private int shrinkRequests;

    @Nullable
    private Subscription subscription;
    static final /* synthetic */ boolean $assertionsDisabled;

    @NotNull
    private final SpscUnboundedArrayQueue<MqttPublishWithFlow> queue = new SpscUnboundedArrayQueue<>(32);

    @NotNull
    private final AtomicInteger queuedCounter = new AtomicInteger();

    @NotNull
    private final ChunkedIntArrayQueue qos1Or2Queue = new ChunkedIntArrayQueue(32);
    private int currentWrite = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public MqttOutgoingQosHandler(@NotNull MqttClientConfig mqttClientConfig, @NotNull MqttPublishFlowables mqttPublishFlowables) {
        this.clientConfig = mqttClientConfig;
        this.publishFlowables = mqttPublishFlowables;
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(@NotNull MqttClientConnectionConfig mqttClientConnectionConfig) {
        super.onSessionStartOrResume(mqttClientConnectionConfig);
        int i = this.sendMaximum;
        int min = Math.min(mqttClientConnectionConfig.getSendMaximum(), 65525);
        this.sendMaximum = min;
        if (i == 0) {
            this.publishFlowables.flatMap(flowable -> {
                return flowable;
            }, true, MAX_CONCURRENT_PUBLISH_FLOWABLES).subscribe(this);
            if (!$assertionsDisabled && this.subscription == null) {
                throw new AssertionError();
            }
            this.packetIdentifiers = new Ranges(1, min);
            this.qos1Or2Map = IntMap.range(1, min);
            this.subscription.request(min);
        } else {
            if (!$assertionsDisabled && this.packetIdentifiers == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.qos1Or2Map == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.subscription == null) {
                throw new AssertionError();
            }
            resize();
            int i2 = (min - i) - this.shrinkRequests;
            if (i2 > 0) {
                this.subscription.request(i2);
                this.shrinkRequests = 0;
            } else {
                this.shrinkRequests = -i2;
            }
        }
        this.topicAliasMapping = mqttClientConnectionConfig.getSendTopicAliasMapping();
    }

    private void resize() {
        if (!$assertionsDisabled && this.packetIdentifiers == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        this.shrinkIds = this.packetIdentifiers.resize(this.sendMaximum);
        if (this.shrinkIds == 0) {
            this.qos1Or2Map = IntMap.resize(this.qos1Or2Map, this.sendMaximum);
        }
    }

    public void onSubscribe(@NotNull Subscription subscription) {
        this.subscription = subscription;
    }

    public void onNext(@NotNull MqttPublishWithFlow mqttPublishWithFlow) {
        this.queue.offer(mqttPublishWithFlow);
        if (this.queuedCounter.getAndIncrement() == 0) {
            mqttPublishWithFlow.getIncomingAckFlow().getEventLoop().execute(this);
        }
    }

    public void onComplete() {
        LOGGER.error("MqttPublishFlowables is global and must never complete. This must not happen and is a bug.");
    }

    public void onError(@NotNull Throwable th) {
        LOGGER.error("MqttPublishFlowables is global and must never error. This must not happen and is a bug.", th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @CallByThread("Netty EventLoop")
    public void request(long j) {
        if (!$assertionsDisabled && this.subscription == null) {
            throw new AssertionError();
        }
        if (this.shrinkRequests == 0) {
            this.subscription.request(j);
        } else if (j <= this.shrinkRequests) {
            this.shrinkRequests = (int) (this.shrinkRequests - j);
        } else {
            this.subscription.request(j - this.shrinkRequests);
            this.shrinkRequests = 0;
        }
    }

    @Override // java.lang.Runnable
    @CallByThread("Netty EventLoop")
    public void run() {
        if (!this.hasSession) {
            clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        ChannelHandlerContext channelHandlerContext = this.ctx;
        if (channelHandlerContext == null) {
            return;
        }
        int min = Math.min(this.queuedCounter.get(), MAX_CONCURRENT_PUBLISH_FLOWABLES);
        for (int i = 0; i < min; i++) {
            MqttPublishWithFlow mqttPublishWithFlow = (MqttPublishWithFlow) this.queue.poll();
            if (!$assertionsDisabled && mqttPublishWithFlow == null) {
                throw new AssertionError();
            }
            writePublish(channelHandlerContext, mqttPublishWithFlow);
        }
        channelHandlerContext.flush();
        if (this.queuedCounter.addAndGet(-min) > 0) {
            channelHandlerContext.channel().eventLoop().execute(this);
        }
    }

    private void writePublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPublishWithFlow mqttPublishWithFlow) {
        if (mqttPublishWithFlow.getPublish().getQos() == MqttQos.AT_MOST_ONCE) {
            writeQos0Publish(channelHandlerContext, mqttPublishWithFlow);
        } else {
            writeQos1Or2Publish(channelHandlerContext, mqttPublishWithFlow);
        }
    }

    private void writeQos0Publish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPublishWithFlow mqttPublishWithFlow) {
        channelHandlerContext.write(mqttPublishWithFlow.getPublish().createStateful(-1, false, this.topicAliasMapping), new DefaultContextPromise(channelHandlerContext.channel(), mqttPublishWithFlow)).addListener2((GenericFutureListener<? extends Future<? super Void>>) this);
    }

    @Override // com.hivemq.shaded.io.netty.util.concurrent.GenericFutureListener
    public void operationComplete(@NotNull ContextFuture<? extends MqttPublishWithFlow> contextFuture) {
        Throwable cause = contextFuture.cause();
        if (cause instanceof IOException) {
            contextFuture.channel().pipeline().fireExceptionCaught(cause);
        } else {
            MqttPublishWithFlow context = contextFuture.getContext();
            context.getIncomingAckFlow().onNext(new MqttPublishResult(context.getPublish(), cause));
        }
    }

    private void writeQos1Or2Publish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPublishWithFlow mqttPublishWithFlow) {
        if (!$assertionsDisabled && this.packetIdentifiers == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        int id = this.packetIdentifiers.getId();
        if (id < 0) {
            LOGGER.error("No Packet Identifier available for QoS 1 or 2 PUBLISH. This must not happen and is a bug.");
            return;
        }
        this.qos1Or2Map.put(id, mqttPublishWithFlow);
        this.qos1Or2Queue.offer(id);
        this.currentWrite = id;
        channelHandlerContext.write(mqttPublishWithFlow.getPublish().createStateful(id, false, this.topicAliasMapping), channelHandlerContext.voidPromise());
        this.currentWrite = -1;
    }

    @Override // com.hivemq.shaded.io.netty.channel.ChannelInboundHandlerAdapter, com.hivemq.shaded.io.netty.channel.ChannelInboundHandler
    public void channelRead(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Object obj) {
        if (obj instanceof MqttPubAck) {
            readPubAck(channelHandlerContext, (MqttPubAck) obj);
            return;
        }
        if (obj instanceof MqttPubRec) {
            readPubRec(channelHandlerContext, (MqttPubRec) obj);
        } else if (obj instanceof MqttPubComp) {
            readPubComp(channelHandlerContext, (MqttPubComp) obj);
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    private void readPubAck(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubAck mqttPubAck) {
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        int packetIdentifier = mqttPubAck.getPacketIdentifier();
        MqttPubOrRelWithFlow remove = this.qos1Or2Map.remove(packetIdentifier);
        if (remove == null) {
            error(channelHandlerContext, "PUBACK contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof MqttPublishWithFlow)) {
            this.qos1Or2Map.put(packetIdentifier, remove);
            error(channelHandlerContext, "PUBACK must not be received for a PUBREL");
            return;
        }
        MqttPublishWithFlow mqttPublishWithFlow = (MqttPublishWithFlow) remove;
        MqttPublish publish = mqttPublishWithFlow.getPublish();
        if (publish.getQos() != MqttQos.AT_LEAST_ONCE) {
            this.qos1Or2Map.put(packetIdentifier, remove);
            error(channelHandlerContext, "PUBACK must not be received for a QoS 2 PUBLISH");
        } else {
            removed(packetIdentifier);
            onPubAck(publish, mqttPubAck);
            mqttPublishWithFlow.getIncomingAckFlow().onNext(new MqttPublishResult.MqttQos1Result(publish, ((Mqtt5PubAckReasonCode) mqttPubAck.getReasonCode()).isError() ? new Mqtt5PubAckException(mqttPubAck, "PUBACK contained an Error Code") : null, mqttPubAck));
        }
    }

    private void readPubRec(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubRec mqttPubRec) {
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        int packetIdentifier = mqttPubRec.getPacketIdentifier();
        MqttPubOrRelWithFlow mqttPubOrRelWithFlow = this.qos1Or2Map.get(packetIdentifier);
        if (mqttPubOrRelWithFlow == null) {
            error(channelHandlerContext, "PUBREC contained unknown packet identifier");
            return;
        }
        if (!(mqttPubOrRelWithFlow instanceof MqttPublishWithFlow)) {
            error(channelHandlerContext, "PUBREC must not be received when the PUBREL has already been sent");
            return;
        }
        MqttPublishWithFlow mqttPublishWithFlow = (MqttPublishWithFlow) mqttPubOrRelWithFlow;
        MqttPublish publish = mqttPublishWithFlow.getPublish();
        if (publish.getQos() != MqttQos.EXACTLY_ONCE) {
            error(channelHandlerContext, "PUBREC must not be received for a QoS 1 PUBLISH");
            return;
        }
        MqttIncomingAckFlow incomingAckFlow = mqttPublishWithFlow.getIncomingAckFlow();
        if (((Mqtt5PubRecReasonCode) mqttPubRec.getReasonCode()).isError()) {
            this.qos1Or2Map.remove(packetIdentifier);
            removed(packetIdentifier);
            onPubRecError(publish, mqttPubRec);
            incomingAckFlow.onNext(new MqttPublishResult.MqttQos2Result(publish, new Mqtt5PubRecException(mqttPubRec, "PUBREC contained an Error Code"), mqttPubRec));
            return;
        }
        MqttPubRel buildPubRel = buildPubRel(publish, mqttPubRec);
        MqttPubRelWithFlow.MqttQos2IntermediateWithFlow mqttQos2IntermediateWithFlow = new MqttPubRelWithFlow.MqttQos2IntermediateWithFlow(buildPubRel, incomingAckFlow);
        this.qos1Or2Map.put(packetIdentifier, mqttQos2IntermediateWithFlow);
        incomingAckFlow.onNext(new MqttPublishResult.MqttQos2IntermediateResult(publish, mqttPubRec, mqttQos2IntermediateWithFlow));
        channelHandlerContext.writeAndFlush(buildPubRel, channelHandlerContext.voidPromise());
    }

    private void readPubComp(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull MqttPubComp mqttPubComp) {
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        int packetIdentifier = mqttPubComp.getPacketIdentifier();
        MqttPubOrRelWithFlow remove = this.qos1Or2Map.remove(packetIdentifier);
        if (remove == null) {
            error(channelHandlerContext, "PUBCOMP contained unknown packet identifier");
            return;
        }
        if (!(remove instanceof MqttPubRelWithFlow)) {
            this.qos1Or2Map.put(packetIdentifier, remove);
            if (((MqttPublishWithFlow) remove).getPublish().getQos() == MqttQos.AT_LEAST_ONCE) {
                error(channelHandlerContext, "PUBCOMP must not be received for a QoS 1 PUBLISH");
                return;
            } else {
                error(channelHandlerContext, "PUBCOMP must not be received when the PUBREL has not been sent yet");
                return;
            }
        }
        MqttPubRelWithFlow mqttPubRelWithFlow = (MqttPubRelWithFlow) remove;
        MqttPubRel pubRel = mqttPubRelWithFlow.getPubRel();
        MqttIncomingAckFlow incomingAckFlow = mqttPubRelWithFlow.getIncomingAckFlow();
        removed(packetIdentifier);
        onPubComp(pubRel, mqttPubComp);
        if (((MqttPubRelWithFlow.MqttQos2IntermediateWithFlow) mqttPubRelWithFlow).getAsBoolean()) {
            incomingAckFlow.acknowledged(1L);
        }
    }

    private void removed(int i) {
        if (!$assertionsDisabled && this.packetIdentifiers == null) {
            throw new AssertionError();
        }
        this.qos1Or2Queue.removeFirst(i);
        this.packetIdentifiers.returnId(i);
        if (i > this.sendMaximum) {
            int i2 = this.shrinkIds - 1;
            this.shrinkIds = i2;
            if (i2 == 0) {
                resize();
            }
        }
    }

    @Override // com.hivemq.shaded.io.netty.channel.ChannelInboundHandlerAdapter, com.hivemq.shaded.io.netty.channel.ChannelHandlerAdapter, com.hivemq.shaded.io.netty.channel.ChannelHandler, com.hivemq.shaded.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull Throwable th) {
        if ((th instanceof IOException) || this.currentWrite == -1) {
            channelHandlerContext.fireExceptionCaught(th);
            return;
        }
        if (!$assertionsDisabled && this.qos1Or2Map == null) {
            throw new AssertionError();
        }
        MqttPublishWithFlow mqttPublishWithFlow = (MqttPublishWithFlow) this.qos1Or2Map.remove(this.currentWrite);
        if (!$assertionsDisabled && mqttPublishWithFlow == null) {
            throw new AssertionError();
        }
        mqttPublishWithFlow.getIncomingAckFlow().onError(th);
        removed(this.currentWrite);
        this.currentWrite = -1;
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(@NotNull Throwable th) {
        super.onSessionEnd(th);
        int size = this.qos1Or2Queue.size();
        if (size > 0) {
            if (!$assertionsDisabled && this.packetIdentifiers == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.qos1Or2Map == null) {
                throw new AssertionError();
            }
            for (int i = 0; i < size; i++) {
                int poll = this.qos1Or2Queue.poll(-1);
                if (!$assertionsDisabled && poll == -1) {
                    throw new AssertionError();
                }
                this.packetIdentifiers.returnId(poll);
                MqttPubOrRelWithFlow remove = this.qos1Or2Map.remove(poll);
                if (!$assertionsDisabled && remove == null) {
                    throw new AssertionError();
                }
                remove.getIncomingAckFlow().onError(th);
            }
            request(size);
        }
        clearQueued(th);
    }

    private void clearQueued(@NotNull Throwable th) {
        int i = 0;
        while (true) {
            MqttPublishWithFlow mqttPublishWithFlow = (MqttPublishWithFlow) this.queue.poll();
            if (mqttPublishWithFlow == null) {
                if (i > 0) {
                    request(i);
                }
                if (this.queuedCounter.addAndGet(-i) == 0) {
                    return;
                } else {
                    i = 0;
                }
            } else {
                mqttPublishWithFlow.getIncomingAckFlow().onError(th);
                i++;
            }
        }
    }

    private static void error(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull String str) {
        MqttDisconnectUtil.disconnect(channelHandlerContext.channel(), Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, str);
    }

    private void onPubAck(@NotNull MqttPublish mqttPublish, @NotNull MqttPubAck mqttPubAck) {
        Mqtt5OutgoingQos1Interceptor outgoingQos1Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos1Interceptor = interceptors.getOutgoingQos1Interceptor()) == null) {
            return;
        }
        outgoingQos1Interceptor.onPubAck(this.clientConfig, mqttPublish, mqttPubAck);
    }

    private void onPubRecError(@NotNull MqttPublish mqttPublish, @NotNull MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubRecError(this.clientConfig, mqttPublish, mqttPubRec);
    }

    @NotNull
    private MqttPubRel buildPubRel(@NotNull MqttPublish mqttPublish, @NotNull MqttPubRec mqttPubRec) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttPubRelBuilder mqttPubRelBuilder = new MqttPubRelBuilder(mqttPubRec);
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors != null && (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) != null) {
            outgoingQos2Interceptor.onPubRec(this.clientConfig, mqttPublish, mqttPubRec, mqttPubRelBuilder);
        }
        return mqttPubRelBuilder.build();
    }

    private void onPubComp(@NotNull MqttPubRel mqttPubRel, @NotNull MqttPubComp mqttPubComp) {
        Mqtt5OutgoingQos2Interceptor outgoingQos2Interceptor;
        MqttClientInterceptors interceptors = this.clientConfig.getAdvancedConfig().getInterceptors();
        if (interceptors == null || (outgoingQos2Interceptor = interceptors.getOutgoingQos2Interceptor()) == null) {
            return;
        }
        outgoingQos2Interceptor.onPubComp(this.clientConfig, mqttPubRel, mqttPubComp);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @NotNull
    public MqttPublishFlowables getPublishFlowables() {
        return this.publishFlowables;
    }

    static {
        $assertionsDisabled = !MqttOutgoingQosHandler.class.desiredAssertionStatus();
        LOGGER = InternalLoggerFactory.getLogger(MqttOutgoingQosHandler.class);
    }
}
