package com.hivemq.mqtt.handler.subscribe.retained;

import com.google.common.collect.ImmutableList;
import com.google.common.primitives.ImmutableIntArray;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.configuration.HivemqId;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.handler.publish.PublishWriteFailedListener;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.persistence.RetainedMessage;
import com.hivemq.persistence.clientqueue.ClientQueuePersistence;
import com.hivemq.persistence.payload.PublishPayloadPersistence;
import com.hivemq.persistence.retained.RetainedMessagePersistence;
import com.hivemq.persistence.util.FutureUtils;
import com.hivemq.util.ChannelAttributes;
import com.hivemq.util.PublishUtil;
import io.netty.channel.Channel;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/retained/RetainedMessagesSender.class */
public class RetainedMessagesSender {
    private static final Logger log = LoggerFactory.getLogger(RetainedMessagesSender.class);
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();

    @NotNull
    private final HivemqId hiveMQId;

    @NotNull
    private final PublishPayloadPersistence publishPayloadPersistence;

    @NotNull
    private final RetainedMessagePersistence retainedMessagePersistence;

    @NotNull
    private final ClientQueuePersistence clientQueuePersistence;

    @NotNull
    private final MqttConfigurationService mqttConfigurationService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/retained/RetainedMessagesSender$SendRetainedMessageCallback.class */
    public static class SendRetainedMessageCallback implements FutureCallback<List<RetainedMessage>> {

        @NotNull
        private final Topic[] subscribedTopics;

        @NotNull
        private final HivemqId hivemqId;

        @NotNull
        private final PublishPayloadPersistence payloadPersistence;

        @NotNull
        private final String clientId;

        @NotNull
        private final SettableFuture<Void> resultFuture;

        @NotNull
        private final Channel channel;

        @NotNull
        private final ClientQueuePersistence clientQueuePersistence;

        @NotNull
        private final MqttConfigurationService mqttConfigurationService;

        SendRetainedMessageCallback(@NotNull Topic[] topicArr, @NotNull HivemqId hivemqId, @NotNull PublishPayloadPersistence publishPayloadPersistence, @NotNull String str, @NotNull SettableFuture<Void> settableFuture, @NotNull Channel channel, @NotNull ClientQueuePersistence clientQueuePersistence, @NotNull MqttConfigurationService mqttConfigurationService) {
            this.subscribedTopics = topicArr;
            this.hivemqId = hivemqId;
            this.payloadPersistence = publishPayloadPersistence;
            this.clientId = str;
            this.resultFuture = settableFuture;
            this.channel = channel;
            this.clientQueuePersistence = clientQueuePersistence;
            this.mqttConfigurationService = mqttConfigurationService;
        }

        public void onSuccess(List<RetainedMessage> list) {
            ImmutableList.Builder builder = ImmutableList.builder();
            for (int i = 0; i < list.size(); i++) {
                RetainedMessage retainedMessage = list.get(i);
                if (retainedMessage != null) {
                    Topic topic = this.subscribedTopics[i];
                    QoS minQoS = PublishUtil.getMinQoS(topic.getQoS(), retainedMessage.getQos());
                    builder.add(new PUBLISHFactory.Mqtt5Builder().withTimestamp(System.currentTimeMillis()).withHivemqId(this.hivemqId.get()).withPayload(retainedMessage.getMessage()).withPublishId(retainedMessage.getPublishId()).withPersistence(this.payloadPersistence).withMessageExpiryInterval(retainedMessage.getMessageExpiryInterval()).withTopic(topic.getTopic()).withRetain(true).withDuplicateDelivery(false).withQoS(minQoS).withOnwardQos(minQoS).withUserProperties(retainedMessage.getUserProperties()).withResponseTopic(retainedMessage.getResponseTopic()).withContentType(retainedMessage.getContentType()).withCorrelationData(retainedMessage.getCorrelationData()).withPayloadFormatIndicator(retainedMessage.getPayloadFormatIndicator()).withSubscriptionIdentifiers(topic.getSubscriptionIdentifier() != null ? ImmutableIntArray.of(topic.getSubscriptionIdentifier().intValue()) : ImmutableIntArray.of()).build());
                }
            }
            sendOutMessages(builder.build());
        }

        private void sendOutMessages(@NotNull List<PUBLISH> list) {
            if (!this.channel.isActive()) {
                Iterator<PUBLISH> it = list.iterator();
                while (it.hasNext()) {
                    this.payloadPersistence.decrementReferenceCounter(it.next().getPublishId());
                }
                this.resultFuture.setException(RetainedMessagesSender.CLOSED_CHANNEL_EXCEPTION);
                return;
            }
            if (RetainedMessagesSender.log.isTraceEnabled()) {
                for (Topic topic : this.subscribedTopics) {
                    RetainedMessagesSender.log.trace("Sending retained message with topic [{}] for client [{}]", topic.getTopic(), this.clientId);
                }
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            ImmutableList.Builder builder2 = ImmutableList.builder();
            for (PUBLISH publish : list) {
                if (publish.getQoS() == QoS.AT_MOST_ONCE) {
                    builder2.add(sendQos0PublishDirectly(publish));
                } else {
                    builder.add(publish);
                }
            }
            List<PUBLISH> build = builder.build();
            if (build.isEmpty()) {
                this.resultFuture.setFuture(FutureUtils.voidFutureFromList(builder2.build()));
                return;
            }
            Long queueSizeMaximum = ((ClientConnection) this.channel.attr(ChannelAttributes.CLIENT_CONNECTION).get()).getQueueSizeMaximum();
            ClientQueuePersistence clientQueuePersistence = this.clientQueuePersistence;
            String str = this.clientId;
            MqttConfigurationService mqttConfigurationService = this.mqttConfigurationService;
            Objects.requireNonNull(mqttConfigurationService);
            builder2.add(clientQueuePersistence.add(str, false, build, true, ((Long) Objects.requireNonNullElseGet(queueSizeMaximum, mqttConfigurationService::maxQueuedMessages)).longValue()));
            this.resultFuture.setFuture(FutureUtils.voidFutureFromList(builder2.build()));
        }

        private ListenableFuture<Void> sendQos0PublishDirectly(@NotNull final PUBLISH publish) {
            final SettableFuture create = SettableFuture.create();
            SettableFuture create2 = SettableFuture.create();
            Futures.addCallback(create2, new FutureCallback<PublishStatus>() { // from class: com.hivemq.mqtt.handler.subscribe.retained.RetainedMessagesSender.SendRetainedMessageCallback.1
                public void onSuccess(@Nullable PublishStatus publishStatus) {
                    if (publishStatus == PublishStatus.DELIVERED) {
                        create.set((Object) null);
                    } else {
                        create.setException(new ClosedChannelException());
                    }
                    SendRetainedMessageCallback.this.payloadPersistence.decrementReferenceCounter(publish.getPublishId());
                    if (publish.getPacketIdentifier() != 0) {
                        ((ClientConnection) SendRetainedMessageCallback.this.channel.attr(ChannelAttributes.CLIENT_CONNECTION).get()).getMessageIDPool().returnId(publish.getPacketIdentifier());
                    }
                }

                public void onFailure(@NotNull Throwable th) {
                    if (th instanceof CancellationException) {
                        return;
                    }
                    SendRetainedMessageCallback.this.payloadPersistence.decrementReferenceCounter(publish.getPublishId());
                }
            }, MoreExecutors.directExecutor());
            this.channel.writeAndFlush(new PublishWithFuture(publish, create2, false, this.payloadPersistence)).addListener(new PublishWriteFailedListener(create2));
            return create;
        }

        public void onFailure(@NotNull Throwable th) {
            this.resultFuture.setException(th);
        }
    }

    @Inject
    public RetainedMessagesSender(@NotNull HivemqId hivemqId, @NotNull PublishPayloadPersistence publishPayloadPersistence, @NotNull RetainedMessagePersistence retainedMessagePersistence, @NotNull ClientQueuePersistence clientQueuePersistence, @NotNull MqttConfigurationService mqttConfigurationService) {
        this.hiveMQId = hivemqId;
        this.publishPayloadPersistence = publishPayloadPersistence;
        this.retainedMessagePersistence = retainedMessagePersistence;
        this.clientQueuePersistence = clientQueuePersistence;
        this.mqttConfigurationService = mqttConfigurationService;
    }

    @NotNull
    public ListenableFuture<Void> writeRetainedMessages(@NotNull Channel channel, @Nullable Topic... topicArr) {
        if (topicArr == null) {
            return Futures.immediateFuture((Object) null);
        }
        String clientId = ((ClientConnection) channel.attr(ChannelAttributes.CLIENT_CONNECTION).get()).getClientId();
        ImmutableList.Builder builder = ImmutableList.builder();
        for (Topic topic : topicArr) {
            builder.add(this.retainedMessagePersistence.get(topic.getTopic()));
        }
        ListenableFuture allAsList = Futures.allAsList(builder.build());
        SettableFuture create = SettableFuture.create();
        Futures.addCallback(allAsList, new SendRetainedMessageCallback(topicArr, this.hiveMQId, this.publishPayloadPersistence, clientId, create, channel, this.clientQueuePersistence, this.mqttConfigurationService), channel.eventLoop());
        return create;
    }

    static {
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
    }
}
