package com.hivemq.persistence.clientsession;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extensions.iteration.ChunkCursor;
import com.hivemq.extensions.iteration.Chunker;
import com.hivemq.extensions.iteration.MultipleChunkResult;
import com.hivemq.logging.EventLog;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.mqtt.topic.SubscriptionFlags;
import com.hivemq.mqtt.topic.TopicFilter;
import com.hivemq.mqtt.topic.tree.LocalTopicTree;
import com.hivemq.persistence.AbstractPersistence;
import com.hivemq.persistence.ChannelPersistence;
import com.hivemq.persistence.ProducerQueues;
import com.hivemq.persistence.SingleWriterService;
import com.hivemq.persistence.clientsession.SharedSubscriptionServiceImpl;
import com.hivemq.persistence.clientsession.callback.SubscriptionResult;
import com.hivemq.persistence.local.ClientSessionLocalPersistence;
import com.hivemq.persistence.local.ClientSessionSubscriptionLocalPersistence;
import com.hivemq.util.ChannelAttributes;
import com.hivemq.util.ReasonStrings;
import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Map;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
/* loaded from: input_file:com/hivemq/persistence/clientsession/ClientSessionSubscriptionPersistenceImpl.class */
public class ClientSessionSubscriptionPersistenceImpl extends AbstractPersistence implements ClientSessionSubscriptionPersistence {
    private final Logger log = LoggerFactory.getLogger(ClientSessionSubscriptionPersistenceImpl.class);

    @NotNull
    private final ClientSessionSubscriptionLocalPersistence localPersistence;

    @NotNull
    private final LocalTopicTree topicTree;

    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;

    @NotNull
    private final ChannelPersistence channelPersistence;

    @NotNull
    private final ProducerQueues singleWriter;

    @NotNull
    private final EventLog eventLog;

    @NotNull
    private final ClientSessionLocalPersistence clientSessionLocalPersistence;

    @NotNull
    private final PublishPollService publishPollService;

    @NotNull
    private final Chunker chunker;

    @NotNull
    private final MqttServerDisconnector mqttServerDisconnector;

    @Inject
    ClientSessionSubscriptionPersistenceImpl(@NotNull ClientSessionSubscriptionLocalPersistence clientSessionSubscriptionLocalPersistence, @NotNull LocalTopicTree localTopicTree, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull SingleWriterService singleWriterService, @NotNull ChannelPersistence channelPersistence, @NotNull EventLog eventLog, @NotNull ClientSessionLocalPersistence clientSessionLocalPersistence, @NotNull PublishPollService publishPollService, @NotNull Chunker chunker, @NotNull MqttServerDisconnector mqttServerDisconnector) {
        this.localPersistence = clientSessionSubscriptionLocalPersistence;
        this.topicTree = localTopicTree;
        this.sharedSubscriptionService = sharedSubscriptionService;
        this.channelPersistence = channelPersistence;
        this.singleWriter = singleWriterService.getSubscriptionQueue();
        this.eventLog = eventLog;
        this.clientSessionLocalPersistence = clientSessionLocalPersistence;
        this.publishPollService = publishPollService;
        this.chunker = chunker;
        this.mqttServerDisconnector = mqttServerDisconnector;
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ImmutableSet<Topic> getSubscriptions(@NotNull String str) {
        Preconditions.checkNotNull(str, "Client id must not be null");
        return this.localPersistence.getSubscriptions(str);
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<SubscriptionResult> addSubscription(@NotNull String str, @NotNull Topic topic) {
        boolean addTopic;
        ListenableFuture submit;
        try {
            Preconditions.checkNotNull(str, "Client id must not be null");
            Preconditions.checkNotNull(topic, "Topic must not be null");
            long currentTimeMillis = System.currentTimeMillis();
            if (this.clientSessionLocalPersistence.getSession(str) == null) {
                return Futures.immediateFuture((Object) null);
            }
            SharedSubscriptionServiceImpl.SharedSubscription checkForSharedSubscription = this.sharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
            if (checkForSharedSubscription == null) {
                addTopic = this.topicTree.addTopic(str, topic, SubscriptionFlags.getDefaultFlags(false, topic.isRetainAsPublished(), topic.isNoLocal()), null);
                submit = this.singleWriter.submit(str, (i, immutableList, i2) -> {
                    this.localPersistence.addSubscription(str, topic, currentTimeMillis, i);
                    return null;
                });
            } else {
                if (checkForSharedSubscription.getTopicFilter().isEmpty()) {
                    disconnectSharedSubscriberWithEmptyTopic(str);
                    return Futures.immediateFuture((Object) null);
                }
                if (topic.getQoS() == QoS.EXACTLY_ONCE) {
                    topic.setQoS(QoS.AT_LEAST_ONCE);
                }
                Topic topic2 = new Topic(checkForSharedSubscription.getTopicFilter(), topic.getQoS(), topic.isNoLocal(), topic.isRetainAsPublished(), topic.getRetainHandling(), topic.getSubscriptionIdentifier());
                addTopic = this.topicTree.addTopic(str, topic2, SubscriptionFlags.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), checkForSharedSubscription.getShareName());
                Subscription subscription = new Subscription(topic2, SubscriptionFlags.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), checkForSharedSubscription.getShareName());
                submit = this.singleWriter.submit(str, (i3, immutableList2, i4) -> {
                    this.localPersistence.addSubscription(str, topic, currentTimeMillis, i3);
                    invalidateSharedSubscriptionCacheAndPoll(str, ImmutableSet.of(subscription));
                    return null;
                });
            }
            boolean z = addTopic;
            return Futures.whenAllComplete(new ListenableFuture[]{submit}).call(() -> {
                return new SubscriptionResult(topic, z, checkForSharedSubscription == null ? null : checkForSharedSubscription.getShareName());
            }, MoreExecutors.directExecutor());
        } catch (Throwable th) {
            return Futures.immediateFailedFuture(th);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<ImmutableList<SubscriptionResult>> addSubscriptions(@NotNull String str, @NotNull ImmutableSet<Topic> immutableSet) {
        try {
            Preconditions.checkNotNull(str, "Client id must not be null");
            Preconditions.checkNotNull(immutableSet, "Topics must not be null");
            return addBatchedTopics(str, immutableSet);
        } catch (Throwable th) {
            return Futures.immediateFailedFuture(th);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> removeSubscriptions(@NotNull String str, @NotNull ImmutableSet<String> immutableSet) {
        try {
            Preconditions.checkNotNull(str, "Client id must not be null");
            Preconditions.checkNotNull(immutableSet, "Topics must not be null");
            return removeBatchedTopics(str, immutableSet);
        } catch (Throwable th) {
            return Futures.immediateFailedFuture(th);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> remove(@NotNull String str, @NotNull String str2) {
        try {
            Preconditions.checkNotNull(str, "Client id must not be null");
            Preconditions.checkNotNull(str2, "Topic must not be null");
            long currentTimeMillis = System.currentTimeMillis();
            SharedSubscriptionServiceImpl.SharedSubscription checkForSharedSubscription = this.sharedSubscriptionService.checkForSharedSubscription(str2);
            if (checkForSharedSubscription == null) {
                this.topicTree.removeSubscriber(str, str2, null);
            } else {
                if (checkForSharedSubscription.getTopicFilter().isEmpty()) {
                    disconnectSharedSubscriberWithEmptyTopic(str);
                    return Futures.immediateFuture((Object) null);
                }
                this.topicTree.removeSubscriber(str, checkForSharedSubscription.getTopicFilter(), checkForSharedSubscription.getShareName());
            }
            ListenableFuture submit = this.singleWriter.submit(str, (i, immutableList, i2) -> {
                this.localPersistence.remove(str, str2, currentTimeMillis, i);
                return null;
            });
            return Futures.whenAllComplete(new ListenableFuture[]{submit}).call(() -> {
                return (Void) submit.get();
            }, MoreExecutors.directExecutor());
        } catch (Throwable th) {
            return Futures.immediateFailedFuture(th);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> removeAll(@NotNull String str) {
        try {
            Preconditions.checkNotNull(str, "Client id must not be null");
            ImmutableSet<Topic> subscriptions = this.localPersistence.getSubscriptions(str);
            HashSet<TopicFilter> hashSet = new HashSet();
            for (Topic topic : subscriptions) {
                SharedSubscriptionServiceImpl.SharedSubscription checkForSharedSubscription = this.sharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
                if (checkForSharedSubscription == null) {
                    hashSet.add(new TopicFilter(topic.getTopic(), null));
                } else {
                    hashSet.add(new TopicFilter(checkForSharedSubscription.getTopicFilter(), checkForSharedSubscription.getShareName()));
                }
            }
            for (TopicFilter topicFilter : hashSet) {
                this.topicTree.removeSubscriber(str, topicFilter.getTopic(), topicFilter.getSharedName());
            }
            return removeAllLocally(str);
        } catch (Throwable th) {
            return Futures.immediateFailedFuture(th);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> removeAllLocally(@NotNull String str) {
        return this.singleWriter.submit(str, (i, immutableList, i2) -> {
            this.localPersistence.removeAll(str, System.currentTimeMillis(), i);
            return null;
        });
    }

    @NotNull
    private ListenableFuture<ImmutableList<SubscriptionResult>> addBatchedTopics(@NotNull String str, @NotNull ImmutableSet<Topic> immutableSet) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.clientSessionLocalPersistence.getSession(str) == null) {
            return Futures.immediateFuture((Object) null);
        }
        ImmutableSet.Builder builder = new ImmutableSet.Builder();
        HashSet<Subscription> hashSet = new HashSet();
        UnmodifiableIterator it = immutableSet.iterator();
        while (it.hasNext()) {
            Topic topic = (Topic) it.next();
            SharedSubscriptionServiceImpl.SharedSubscription checkForSharedSubscription = this.sharedSubscriptionService.checkForSharedSubscription(topic.getTopic());
            if (checkForSharedSubscription == null) {
                hashSet.add(new Subscription(topic, SubscriptionFlags.getDefaultFlags(false, topic.isRetainAsPublished(), topic.isNoLocal()), null));
            } else {
                if (checkForSharedSubscription.getTopicFilter().isEmpty()) {
                    disconnectSharedSubscriberWithEmptyTopic(str);
                    return Futures.immediateFuture((Object) null);
                }
                if (topic.getQoS() == QoS.EXACTLY_ONCE) {
                    topic.setQoS(QoS.AT_LEAST_ONCE);
                }
                Subscription subscription = new Subscription(new Topic(checkForSharedSubscription.getTopicFilter(), topic.getQoS(), topic.isNoLocal(), topic.isRetainAsPublished(), topic.getRetainHandling(), topic.getSubscriptionIdentifier()), SubscriptionFlags.getDefaultFlags(true, topic.isRetainAsPublished(), topic.isNoLocal()), checkForSharedSubscription.getShareName());
                builder.add(subscription);
                hashSet.add(subscription);
            }
        }
        ImmutableList.Builder builder2 = ImmutableList.builder();
        for (Subscription subscription2 : hashSet) {
            builder2.add(new SubscriptionResult(subscription2.getTopic(), this.topicTree.addTopic(str, subscription2.getTopic(), subscription2.getFlags(), subscription2.getSharedGroup()), subscription2.getSharedGroup()));
        }
        ListenableFuture submit = this.singleWriter.submit(str, (i, immutableList, i2) -> {
            this.localPersistence.addSubscriptions(str, immutableSet, currentTimeMillis, i);
            return null;
        });
        invalidateSharedSubscriptionCacheAndPoll(str, builder.build());
        return Futures.whenAllComplete(new ListenableFuture[]{submit}).call(() -> {
            return builder2.build();
        }, MoreExecutors.directExecutor());
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    public void invalidateSharedSubscriptionCacheAndPoll(@NotNull String str, @NotNull ImmutableSet<Subscription> immutableSet) {
        Channel channel;
        Preconditions.checkNotNull(str, "Client id must never be null");
        Preconditions.checkNotNull(immutableSet, "Subscriptions must never be null");
        ClientSession session = this.clientSessionLocalPersistence.getSession(str);
        if ((session == null || session.isConnected()) && !immutableSet.isEmpty() && (channel = this.channelPersistence.get(str)) != null && channel.isActive()) {
            UnmodifiableIterator it = immutableSet.iterator();
            while (it.hasNext()) {
                Subscription subscription = (Subscription) it.next();
                Topic topic = subscription.getTopic();
                String str2 = subscription.getSharedGroup() + "/" + topic.getTopic();
                this.publishPollService.pollSharedPublishesForClient(str, str2, topic.getQoS().getQosNumber(), topic.isRetainAsPublished(), topic.getSubscriptionIdentifier(), channel);
                this.sharedSubscriptionService.invalidateSharedSubscriptionCache(str);
                this.sharedSubscriptionService.invalidateSharedSubscriberCache(str2);
                channel.attr(ChannelAttributes.NO_SHARED_SUBSCRIPTION).set(false);
                this.log.trace("Invalidated cache and polled for shared subscription '{}' and client '{}'", str2, str);
            }
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<MultipleChunkResult<Map<String, ImmutableSet<Topic>>>> getAllLocalSubscribersChunk(@NotNull ChunkCursor chunkCursor) {
        return this.chunker.getAllLocalChunk(chunkCursor, 2000, (i, str, i2) -> {
            return this.singleWriter.submit(i, (i, immutableList, i2) -> {
                return this.localPersistence.getAllSubscribersChunk(i, str, i2);
            });
        });
    }

    @NotNull
    private ListenableFuture<Void> removeBatchedTopics(@NotNull String str, @NotNull ImmutableSet<String> immutableSet) {
        long currentTimeMillis = System.currentTimeMillis();
        ImmutableSet.Builder builder = new ImmutableSet.Builder();
        UnmodifiableIterator it = immutableSet.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            SharedSubscriptionServiceImpl.SharedSubscription checkForSharedSubscription = this.sharedSubscriptionService.checkForSharedSubscription(str2);
            if (checkForSharedSubscription == null) {
                builder.add(new TopicFilter(str2, null));
            } else {
                builder.add(new TopicFilter(checkForSharedSubscription.getTopicFilter(), checkForSharedSubscription.getShareName()));
            }
        }
        UnmodifiableIterator it2 = builder.build().iterator();
        while (it2.hasNext()) {
            TopicFilter topicFilter = (TopicFilter) it2.next();
            this.topicTree.removeSubscriber(str, topicFilter.getTopic(), topicFilter.getSharedName());
        }
        return this.singleWriter.submit(str, (i, immutableList, i2) -> {
            this.localPersistence.removeSubscriptions(str, immutableSet, currentTimeMillis, i);
            return null;
        });
    }

    private void disconnectSharedSubscriberWithEmptyTopic(@NotNull String str) {
        Channel channel = this.channelPersistence.get(str);
        if (channel != null) {
            this.mqttServerDisconnector.disconnect(channel, "A client (IP: {}) sent a shared subscription with an empty topic. Disconnecting client.", "Sent shared subscription with empty topic", Mqtt5DisconnectReasonCode.TOPIC_FILTER_INVALID, ReasonStrings.DISCONNECT_TOPIC_NAME_INVALID_SHARED_EMPTY);
        } else {
            this.log.debug("Client {} sent a shared subscription with empty topic.", str);
        }
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ImmutableSet<Topic> getSharedSubscriptions(@NotNull String str) {
        Preconditions.checkNotNull(str, "Client id must not be null");
        ImmutableSet<Topic> subscriptions = getSubscriptions(str);
        ImmutableSet.Builder builder = ImmutableSet.builder();
        UnmodifiableIterator it = subscriptions.iterator();
        while (it.hasNext()) {
            Topic topic = (Topic) it.next();
            if (this.sharedSubscriptionService.checkForSharedSubscription(topic.getTopic()) != null) {
                builder.add(topic);
            }
        }
        return builder.build();
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> cleanUp(int i) {
        return this.singleWriter.submit(i, (i2, immutableList, i3) -> {
            this.localPersistence.cleanUp(i2);
            return null;
        });
    }

    @Override // com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence
    @NotNull
    public ListenableFuture<Void> closeDB() {
        return closeDB(this.localPersistence, this.singleWriter);
    }
}
