package com.hivemq.mqtt.handler.subscribe.retained;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.message.mqtt5.Mqtt5RetainHandling;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.persistence.clientsession.callback.SubscriptionResult;
import com.hivemq.persistence.retained.RetainedMessagePersistence;
import com.hivemq.util.Exceptions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/retained/SendRetainedMessagesListener.class */
public class SendRetainedMessagesListener implements ChannelFutureListener {

    @NotNull
    private final RetainedMessagePersistence retainedMessagePersistence;

    @NotNull
    private final RetainedMessagesSender retainedMessagesSender;

    @NotNull
    private final List<SubscriptionResult> subscriptions;

    @NotNull
    private final Set<Topic> ignoredTopics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/retained/SendRetainedMessagesListener$RetainedMessagesHandleWildcardsCallback.class */
    public static class RetainedMessagesHandleWildcardsCallback implements FutureCallback<Set<String>> {
        static final int CONCURRENT_MESSAGES = 25;

        @NotNull
        private final Topic subscription;

        @NotNull
        private final Channel channel;

        @NotNull
        private final RetainedMessagesSender retainedMessagesSender;

        RetainedMessagesHandleWildcardsCallback(@NotNull Topic topic, @NotNull Channel channel, @NotNull RetainedMessagesSender retainedMessagesSender) {
            this.subscription = topic;
            this.channel = channel;
            this.retainedMessagesSender = retainedMessagesSender;
        }

        public void onSuccess(@Nullable Set<String> set) {
            if (set == null || set.size() == 0) {
                return;
            }
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(set);
            Integer clientReceiveMaximum = ClientConnection.of(this.channel).getClientReceiveMaximum();
            int min = Math.min(clientReceiveMaximum == null ? 25 : Math.min(clientReceiveMaximum.intValue(), 25), set.size());
            Topic[] topicArr = new Topic[min];
            for (int i = 0; i < min; i++) {
                topicArr[i] = new Topic((String) concurrentLinkedQueue.poll(), this.subscription.getQoS(), this.subscription.isNoLocal(), this.subscription.isRetainAsPublished(), this.subscription.getRetainHandling(), this.subscription.getSubscriptionIdentifier());
            }
            Futures.addCallback(this.retainedMessagesSender.writeRetainedMessages(this.channel, topicArr), new SendRetainedMessageListenerAndScheduleNext(this.subscription, concurrentLinkedQueue, this.channel, this.retainedMessagesSender, min), this.channel.eventLoop());
        }

        public void onFailure(@NotNull Throwable th) {
            Exceptions.rethrowError("Unable to send retained messages on topic " + this.subscription.getTopic() + " to client " + ClientConnection.of(this.channel).getClientId() + ".", th);
            this.channel.disconnect();
        }
    }

    public SendRetainedMessagesListener(@NotNull List<SubscriptionResult> list, @NotNull Set<Topic> set, @NotNull RetainedMessagePersistence retainedMessagePersistence, @NotNull RetainedMessagesSender retainedMessagesSender) {
        Preconditions.checkNotNull(list, "Subscriptions must not be null");
        Preconditions.checkNotNull(set, "ignoredTopics must not be null");
        this.subscriptions = list;
        this.ignoredTopics = set;
        this.retainedMessagePersistence = retainedMessagePersistence;
        this.retainedMessagesSender = retainedMessagesSender;
    }

    public void operationComplete(@Nullable ChannelFuture channelFuture) throws Exception {
        Channel channel;
        if (channelFuture == null || !channelFuture.isSuccess() || (channel = channelFuture.channel()) == null || !channel.isActive()) {
            return;
        }
        List<Topic> sendExactMatches = sendExactMatches(channel);
        if (sendExactMatches.isEmpty()) {
            return;
        }
        sendMatchingWildcardSubscriptions(sendExactMatches, channel);
    }

    @NotNull
    private List<Topic> sendExactMatches(@NotNull Channel channel) {
        ArrayList arrayList = new ArrayList(this.subscriptions.size());
        for (SubscriptionResult subscriptionResult : this.subscriptions) {
            if (subscriptionResult != null) {
                Topic topic = subscriptionResult.getTopic();
                if (topic.getRetainHandling() != Mqtt5RetainHandling.DO_NOT_SEND && (topic.getRetainHandling() != Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST || !subscriptionResult.subscriptionAlreadyExisted())) {
                    if (subscriptionResult.getShareName() == null && !this.ignoredTopics.contains(topic)) {
                        String topic2 = topic.getTopic();
                        if (topic2.contains("#") || topic2.contains("+")) {
                            arrayList.add(topic);
                        } else {
                            Futures.addCallback(this.retainedMessagesSender.writeRetainedMessages(channel, topic), new SendRetainedMessageResultListener(channel, topic, this.retainedMessagesSender), channel.eventLoop());
                        }
                    }
                }
            }
        }
        return arrayList;
    }

    private void sendMatchingWildcardSubscriptions(@NotNull List<Topic> list, @NotNull Channel channel) {
        for (Topic topic : list) {
            Futures.addCallback(this.retainedMessagePersistence.getWithWildcards(topic.getTopic()), new RetainedMessagesHandleWildcardsCallback(topic, channel, this.retainedMessagesSender), channel.eventLoop());
        }
    }
}
