package com.hivemq.mqtt.services;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.handler.publish.PublishReturnCode;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.topic.SubscriberWithIdentifiers;
import com.hivemq.mqtt.topic.tree.LocalTopicTree;
import com.hivemq.mqtt.topic.tree.TopicSubscribers;
import com.hivemq.persistence.RetainedMessage;
import com.hivemq.persistence.retained.RetainedMessagePersistence;
import com.hivemq.persistence.util.FutureUtils;
import com.hivemq.util.Exceptions;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
/* loaded from: input_file:com/hivemq/mqtt/services/InternalPublishServiceImpl.class */
public class InternalPublishServiceImpl implements InternalPublishService {
    private static final Logger log = LoggerFactory.getLogger(InternalPublishServiceImpl.class);
    private final RetainedMessagePersistence retainedMessagePersistence;
    private final LocalTopicTree topicTree;
    private final PublishDistributor publishDistributor;
    private final boolean acknowledgeAfterPersist = InternalConfigurations.ACKNOWLEDGE_INCOMING_PUBLISH_AFTER_PERSISTING_ENABLED.get();

    @Inject
    public InternalPublishServiceImpl(RetainedMessagePersistence retainedMessagePersistence, LocalTopicTree localTopicTree, PublishDistributor publishDistributor) {
        this.retainedMessagePersistence = retainedMessagePersistence;
        this.topicTree = localTopicTree;
        this.publishDistributor = publishDistributor;
    }

    @Override // com.hivemq.mqtt.services.InternalPublishService
    @NotNull
    public ListenableFuture<PublishReturnCode> publish(@NotNull PUBLISH publish, @NotNull ExecutorService executorService, @Nullable String str) {
        Preconditions.checkNotNull(publish, "PUBLISH can not be null");
        Preconditions.checkNotNull(executorService, "executorService can not be null");
        publish.setDuplicateDelivery(false);
        ListenableFuture<Void> persistRetainedMessage = persistRetainedMessage(publish, executorService);
        ListenableFuture<PublishReturnCode> handlePublish = handlePublish(publish, executorService, str);
        return Futures.whenAllComplete(new ListenableFuture[]{handlePublish, persistRetainedMessage}).call(() -> {
            return (PublishReturnCode) handlePublish.get();
        }, executorService);
    }

    private ListenableFuture<Void> persistRetainedMessage(final PUBLISH publish, ExecutorService executorService) {
        ListenableFuture<Void> remove;
        if (!publish.isRetain()) {
            return Futures.immediateFuture((Object) null);
        }
        final SettableFuture create = SettableFuture.create();
        if (publish.getPayload().length > 0) {
            RetainedMessage retainedMessage = new RetainedMessage(publish, publish.getMessageExpiryInterval());
            log.trace("Adding retained message on topic {}", publish.getTopic());
            remove = this.retainedMessagePersistence.persist(publish.getTopic(), retainedMessage);
        } else {
            log.trace("Deleting retained message on topic {}", publish.getTopic());
            remove = this.retainedMessagePersistence.remove(publish.getTopic());
        }
        if (this.acknowledgeAfterPersist) {
            Futures.addCallback(remove, new FutureCallback<Void>() { // from class: com.hivemq.mqtt.services.InternalPublishServiceImpl.1
                public void onSuccess(@Nullable Void r4) {
                    create.set((Object) null);
                }

                public void onFailure(@NotNull Throwable th) {
                    Exceptions.rethrowError("Unable able to store retained message for topic " + publish.getTopic() + " with message id " + publish.getUniqueId() + ".", th);
                    create.set((Object) null);
                }
            }, executorService);
        } else {
            create.set((Object) null);
        }
        return create;
    }

    @NotNull
    private ListenableFuture<PublishReturnCode> handlePublish(@NotNull PUBLISH publish, @NotNull ExecutorService executorService, @Nullable String str) {
        TopicSubscribers findTopicSubscribers = this.topicTree.findTopicSubscribers(publish.getTopic());
        ImmutableSet<SubscriberWithIdentifiers> subscribers = findTopicSubscribers.getSubscribers();
        ImmutableSet<String> sharedSubscriptions = findTopicSubscribers.getSharedSubscriptions();
        if (subscribers.isEmpty() && sharedSubscriptions.isEmpty()) {
            return Futures.immediateFuture(PublishReturnCode.NO_MATCHING_SUBSCRIBERS);
        }
        if (!this.acknowledgeAfterPersist) {
            deliverPublish(findTopicSubscribers, str, publish, executorService, null);
            return Futures.immediateFuture(PublishReturnCode.DELIVERED);
        }
        SettableFuture<PublishReturnCode> create = SettableFuture.create();
        deliverPublish(findTopicSubscribers, str, publish, executorService, create);
        return create;
    }

    private void deliverPublish(@NotNull TopicSubscribers topicSubscribers, @Nullable String str, @NotNull final PUBLISH publish, @NotNull ExecutorService executorService, @Nullable final SettableFuture<PublishReturnCode> settableFuture) {
        Set<String> sharedSubscriptions = topicSubscribers.getSharedSubscriptions();
        HashMap hashMap = new HashMap(topicSubscribers.getSubscribers().size());
        UnmodifiableIterator it = topicSubscribers.getSubscribers().iterator();
        while (it.hasNext()) {
            SubscriberWithIdentifiers subscriberWithIdentifiers = (SubscriberWithIdentifiers) it.next();
            if (!subscriberWithIdentifiers.isSharedSubscription() && (!subscriberWithIdentifiers.isNoLocal() || str == null || !str.equals(subscriberWithIdentifiers.getSubscriber()))) {
                hashMap.put(subscriberWithIdentifiers.getSubscriber(), subscriberWithIdentifiers);
            }
        }
        Futures.addCallback(FutureUtils.mergeVoidFutures(this.publishDistributor.distributeToNonSharedSubscribers(hashMap, publish, executorService), sharedSubscriptions != null ? this.publishDistributor.distributeToSharedSubscribers(sharedSubscriptions, publish, executorService) : Futures.immediateFuture((Object) null)), new FutureCallback<Void>() { // from class: com.hivemq.mqtt.services.InternalPublishServiceImpl.2
            public void onSuccess(@Nullable Void r4) {
                if (settableFuture != null) {
                    settableFuture.set(PublishReturnCode.DELIVERED);
                }
            }

            public void onFailure(@NotNull Throwable th) {
                Exceptions.rethrowError("Unable to publish message for topic " + publish.getTopic() + " with message id" + publish.getUniqueId() + ".", th);
                if (settableFuture != null) {
                    settableFuture.set(PublishReturnCode.FAILED);
                }
            }
        }, executorService);
    }
}
