package com.hivemq.mqtt.services;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.ImmutableIntArray;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.bootstrap.ClientState;
import com.hivemq.bootstrap.ioc.lazysingleton.LazySingleton;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.callback.PublishStatusFutureCallback;
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.message.MessageWithID;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.dropping.MessageDroppedService;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.exception.NoMessageIdAvailableException;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
import com.hivemq.mqtt.message.publish.PubrelWithFuture;
import com.hivemq.mqtt.message.pubrel.PUBREL;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.mqtt.topic.SubscriberWithQoS;
import com.hivemq.persistence.SingleWriterService;
import com.hivemq.persistence.clientqueue.ClientQueuePersistence;
import com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence;
import com.hivemq.persistence.clientsession.SharedSubscriptionService;
import com.hivemq.persistence.connection.ConnectionPersistence;
import com.hivemq.persistence.payload.PayloadPersistenceException;
import com.hivemq.persistence.payload.PublishPayloadPersistence;
import com.hivemq.persistence.util.FutureUtils;
import com.hivemq.util.Exceptions;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@LazySingleton
/* loaded from: input_file:com/hivemq/mqtt/services/PublishPollServiceImpl.class */
public class PublishPollServiceImpl implements PublishPollService {

    @NotNull
    private static final Logger log = LoggerFactory.getLogger(PublishPollService.class);

    @NotNull
    private final ClientQueuePersistence clientQueuePersistence;

    @NotNull
    private final ConnectionPersistence connectionPersistence;

    @NotNull
    private final PublishPayloadPersistence payloadPersistence;

    @NotNull
    private final MessageDroppedService messageDroppedService;

    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;

    @NotNull
    private final SingleWriterService singleWriterService;

    @NotNull
    private final ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/mqtt/services/PublishPollServiceImpl$PubrelResendCallback.class */
    public class PubrelResendCallback implements FutureCallback<PublishStatus> {

        @NotNull
        private final String client;

        @NotNull
        private final MessageWithID message;

        @NotNull
        private final MessageIDPool messageIDPool;

        @NotNull
        private final Channel channel;

        PubrelResendCallback(@NotNull String str, @NotNull MessageWithID messageWithID, @NotNull MessageIDPool messageIDPool, @NotNull Channel channel) {
            this.client = str;
            this.message = messageWithID;
            this.messageIDPool = messageIDPool;
            this.channel = channel;
        }

        public void onSuccess(@NotNull PublishStatus publishStatus) {
            this.messageIDPool.returnId(this.message.getPacketIdentifier());
            if (publishStatus != PublishStatus.NOT_CONNECTED) {
                FutureUtils.addExceptionLogger(PublishPollServiceImpl.this.removeMessageFromQueue(this.client, this.message.getPacketIdentifier()));
            }
            AtomicInteger inFlightMessageCount = ClientConnection.of(this.channel).getInFlightMessageCount();
            if (inFlightMessageCount == null || inFlightMessageCount.decrementAndGet() <= 0) {
                PublishPollServiceImpl.this.pollMessages(this.client, this.channel);
            }
        }

        public void onFailure(Throwable th) {
            Exceptions.rethrowError("Pubrel delivery failed", th);
            this.messageIDPool.returnId(this.message.getPacketIdentifier());
            AtomicInteger inFlightMessageCount = ClientConnection.of(this.channel).getInFlightMessageCount();
            if (inFlightMessageCount != null) {
                inFlightMessageCount.decrementAndGet();
            }
        }
    }

    @Inject
    public PublishPollServiceImpl(@NotNull ClientQueuePersistence clientQueuePersistence, @NotNull ConnectionPersistence connectionPersistence, @NotNull PublishPayloadPersistence publishPayloadPersistence, @NotNull MessageDroppedService messageDroppedService, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull SingleWriterService singleWriterService, @NotNull ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence) {
        this.clientQueuePersistence = clientQueuePersistence;
        this.connectionPersistence = connectionPersistence;
        this.payloadPersistence = publishPayloadPersistence;
        this.messageDroppedService = messageDroppedService;
        this.sharedSubscriptionService = sharedSubscriptionService;
        this.singleWriterService = singleWriterService;
        this.clientSessionSubscriptionPersistence = clientSessionSubscriptionPersistence;
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollMessages(@NotNull String str, @NotNull Channel channel) {
        Preconditions.checkNotNull(str, "Client must not be null");
        Preconditions.checkNotNull(channel, "Channel must not be null");
        ClientConnection of = ClientConnection.of(channel);
        if (!of.isInFlightMessagesSent()) {
            pollInflightMessages(str, channel);
            return;
        }
        pollNewMessages(str, channel);
        if (of.getNoSharedSubscription()) {
            return;
        }
        try {
            ImmutableSet<Topic> sharedSubscriptions = this.sharedSubscriptionService.getSharedSubscriptions(str, () -> {
                return this.clientSessionSubscriptionPersistence.getSharedSubscriptions(str);
            });
            if (sharedSubscriptions.isEmpty()) {
                of.setNoSharedSubscription(true);
                return;
            }
            UnmodifiableIterator it = sharedSubscriptions.iterator();
            while (it.hasNext()) {
                Topic topic = (Topic) it.next();
                SharedSubscriptionService sharedSubscriptionService = this.sharedSubscriptionService;
                pollSharedPublishesForClient(str, SharedSubscriptionService.removePrefix(topic.getTopic()), topic.getQoS().getQosNumber(), topic.isRetainAsPublished(), topic.getSubscriptionIdentifier(), channel);
            }
        } catch (ExecutionException e) {
            log.error("Exception while reading shared subscriptions for client {}", str, e);
        }
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollNewMessages(@NotNull String str) {
        ClientConnection clientConnection = this.connectionPersistence.get(str);
        if (clientConnection == null || clientConnection.getClientState() == ClientState.DISCONNECTING || clientConnection.getClientState().disconnected()) {
            return;
        }
        pollNewMessages(str, clientConnection.getChannel());
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollNewMessages(@NotNull final String str, @NotNull final Channel channel) {
        final MessageIDPool messageIDPool = ClientConnection.of(channel).getMessageIDPool();
        try {
            final ImmutableIntArray createMessageIds = createMessageIds(messageIDPool, pollMessageLimit(channel));
            Futures.addCallback(this.clientQueuePersistence.readNew(str, false, createMessageIds, 5242880L), new FutureCallback<ImmutableList<PUBLISH>>() { // from class: com.hivemq.mqtt.services.PublishPollServiceImpl.1
                public void onSuccess(ImmutableList<PUBLISH> immutableList) {
                    int i = 0;
                    UnmodifiableIterator it = immutableList.iterator();
                    while (it.hasNext()) {
                        if (((PUBLISH) it.next()).getQoS() != QoS.AT_MOST_ONCE) {
                            i++;
                        }
                    }
                    for (int i2 = i; i2 < createMessageIds.length(); i2++) {
                        messageIDPool.returnId(createMessageIds.get(i2));
                    }
                    ArrayList arrayList = new ArrayList(immutableList.size());
                    AtomicInteger inFlightMessageCount = PublishPollServiceImpl.this.inFlightMessageCount(channel);
                    inFlightMessageCount.addAndGet(immutableList.size());
                    UnmodifiableIterator it2 = immutableList.iterator();
                    while (it2.hasNext()) {
                        PUBLISH publish = (PUBLISH) it2.next();
                        try {
                            SettableFuture create = SettableFuture.create();
                            Futures.addCallback(create, new PublishStatusFutureCallback(PublishPollServiceImpl.this.payloadPersistence, PublishPollServiceImpl.this, false, str, publish, messageIDPool, channel, str), MoreExecutors.directExecutor());
                            arrayList.add(new PublishWithFuture(publish, create, false, PublishPollServiceImpl.this.payloadPersistence));
                        } catch (PayloadPersistenceException e) {
                            PublishPollServiceImpl.log.error("Payload reference error for publish on topic: " + publish.getTopic(), e);
                            if (publish.getQoS().getQosNumber() > 0) {
                                PublishPollServiceImpl.this.removeMessageFromQueue(str, publish.getPacketIdentifier());
                            }
                            inFlightMessageCount.decrementAndGet();
                            PublishPollServiceImpl.this.messageDroppedService.failed(str, publish.getTopic(), publish.getQoS().getQosNumber());
                        }
                    }
                    ClientConnection.of(channel).mo1getPublishFlushHandler().sendPublishes(arrayList);
                }

                public void onFailure(Throwable th) {
                    Exceptions.rethrowError("Exception in new messages handling", th);
                    channel.disconnect();
                }
            }, this.singleWriterService.callbackExecutor(str));
        } catch (NoMessageIdAvailableException e) {
            log.error("No message id available for client " + str, e);
        }
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollInflightMessages(@NotNull final String str, @NotNull final Channel channel) {
        Futures.addCallback(this.clientQueuePersistence.readInflight(str, 5242880L, pollMessageLimit(channel)), new FutureCallback<ImmutableList<MessageWithID>>() { // from class: com.hivemq.mqtt.services.PublishPollServiceImpl.2
            public void onSuccess(ImmutableList<MessageWithID> immutableList) {
                ClientConnection of = ClientConnection.of(channel);
                if (immutableList.isEmpty()) {
                    of.setInFlightMessagesSent(true);
                    EventLoop eventLoop = channel.eventLoop();
                    String str2 = str;
                    Channel channel2 = channel;
                    eventLoop.submit(() -> {
                        PublishPollServiceImpl.this.pollMessages(str2, channel2);
                    });
                    return;
                }
                ArrayList arrayList = new ArrayList(immutableList.size());
                AtomicInteger inFlightMessageCount = PublishPollServiceImpl.this.inFlightMessageCount(channel);
                inFlightMessageCount.addAndGet(immutableList.size());
                int size = immutableList.size();
                for (int i = 0; i < size; i++) {
                    MessageWithID messageWithID = (MessageWithID) immutableList.get(i);
                    MessageIDPool messageIDPool = of.getMessageIDPool();
                    try {
                        int takeIfAvailable = messageIDPool.takeIfAvailable(messageWithID.getPacketIdentifier());
                        if (messageWithID.getPacketIdentifier() != takeIfAvailable) {
                            messageIDPool.returnId(takeIfAvailable);
                        }
                        if (messageWithID instanceof PUBLISH) {
                            PUBLISH publish = (PUBLISH) messageWithID;
                            try {
                                SettableFuture create = SettableFuture.create();
                                Futures.addCallback(create, new PublishStatusFutureCallback(PublishPollServiceImpl.this.payloadPersistence, PublishPollServiceImpl.this, false, str, publish, messageIDPool, channel, str), MoreExecutors.directExecutor());
                                arrayList.add(new PublishWithFuture(publish, create, false, PublishPollServiceImpl.this.payloadPersistence));
                            } catch (PayloadPersistenceException e) {
                                PublishPollServiceImpl.log.error("Payload reference error for publish on topic: " + publish.getTopic(), e);
                                if (publish.getQoS().getQosNumber() > 0) {
                                    PublishPollServiceImpl.this.removeMessageFromQueue(str, publish.getPacketIdentifier());
                                }
                                inFlightMessageCount.decrementAndGet();
                                PublishPollServiceImpl.this.messageDroppedService.failed(str, publish.getTopic(), publish.getQoS().getQosNumber());
                            }
                        } else if (messageWithID instanceof PUBREL) {
                            SettableFuture create2 = SettableFuture.create();
                            channel.writeAndFlush(new PubrelWithFuture((PUBREL) messageWithID, create2));
                            Futures.addCallback(create2, new PubrelResendCallback(str, messageWithID, messageIDPool, channel), MoreExecutors.directExecutor());
                        }
                    } catch (NoMessageIdAvailableException e2) {
                        PublishPollServiceImpl.log.error("No message id available for client ." + str, e2);
                        if (messageWithID instanceof PUBLISH) {
                            PublishPollServiceImpl.this.messageDroppedService.queueFull(str, ((PUBLISH) messageWithID).getTopic(), ((PUBLISH) messageWithID).getQoS().getQosNumber());
                            return;
                        }
                        return;
                    }
                }
                of.mo1getPublishFlushHandler().sendPublishes(arrayList);
            }

            public void onFailure(Throwable th) {
                Exceptions.rethrowError("Exception in inflight messages handling", th);
            }
        }, this.singleWriterService.callbackExecutor(str));
    }

    private AtomicInteger inFlightMessageCount(@NotNull Channel channel) {
        ClientConnection of = ClientConnection.of(channel);
        if (of.getInFlightMessageCount() == null) {
            of.setInFlightMessageCount(new AtomicInteger(0));
        }
        return of.getInFlightMessageCount();
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollSharedPublishes(@NotNull String str) {
        ArrayList arrayList = new ArrayList((Collection) this.sharedSubscriptionService.getSharedSubscriber(str));
        for (int size = arrayList.size(); size > 0; size--) {
            int nextInt = ThreadLocalRandom.current().nextInt(size);
            SubscriberWithQoS subscriberWithQoS = (SubscriberWithQoS) arrayList.get(nextInt);
            ClientConnection clientConnection = this.connectionPersistence.get(subscriberWithQoS.getSubscriber());
            if (clientConnection == null || !clientConnection.getChannel().isActive()) {
                arrayList.set(nextInt, (SubscriberWithQoS) arrayList.get(size - 1));
            } else {
                pollSharedPublishesForClient(subscriberWithQoS.getSubscriber(), str, subscriberWithQoS.getQos(), subscriberWithQoS.isRetainAsPublished(), subscriberWithQoS.getSubscriptionIdentifier(), clientConnection.getChannel());
            }
        }
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    public void pollSharedPublishesForClient(@NotNull final String str, @NotNull final String str2, final int i, final boolean z, @Nullable final Integer num, @NotNull final Channel channel) {
        final ClientConnection of = ClientConnection.of(channel);
        if (of.isMessagesInFlight()) {
            return;
        }
        Futures.addCallback(this.clientQueuePersistence.readShared(str2, pollMessageLimit(channel), 5242880L), new FutureCallback<ImmutableList<PUBLISH>>() { // from class: com.hivemq.mqtt.services.PublishPollServiceImpl.3
            public void onSuccess(@NotNull ImmutableList<PUBLISH> immutableList) {
                if (immutableList.isEmpty()) {
                    return;
                }
                MessageIDPool messageIDPool = of.getMessageIDPool();
                ArrayList arrayList = new ArrayList(immutableList.size());
                AtomicInteger inFlightMessageCount = PublishPollServiceImpl.this.inFlightMessageCount(channel);
                inFlightMessageCount.addAndGet(immutableList.size());
                UnmodifiableIterator it = immutableList.iterator();
                while (it.hasNext()) {
                    PUBLISH publish = (PUBLISH) it.next();
                    try {
                        if (publish.getOnwardQoS().getQosNumber() > 0 && i == 0) {
                            PublishPollServiceImpl.this.removeMessageFromSharedQueue(str2, publish.getUniqueId());
                        }
                        QoS valueOf = QoS.valueOf(Math.min(i, publish.getOnwardQoS().getQosNumber()));
                        ImmutableIntArray of2 = num != null ? ImmutableIntArray.of(num.intValue()) : ImmutableIntArray.of();
                        int i2 = 0;
                        if (((QoS) Preconditions.checkNotNull(valueOf)).getQosNumber() > 0) {
                            i2 = messageIDPool.takeNextId();
                        }
                        PUBLISH build = new PUBLISHFactory.Mqtt5Builder().fromPublish(publish).withPacketIdentifier(i2).withQoS(valueOf).withOnwardQos(valueOf).withRetain(publish.isRetain() && z).withSubscriptionIdentifiers(of2).build();
                        try {
                            SettableFuture create = SettableFuture.create();
                            Futures.addCallback(create, new PublishStatusFutureCallback(PublishPollServiceImpl.this.payloadPersistence, PublishPollServiceImpl.this, true, str2, build, messageIDPool, channel, str), MoreExecutors.directExecutor());
                            arrayList.add(new PublishWithFuture(build, create, false, PublishPollServiceImpl.this.payloadPersistence));
                        } catch (PayloadPersistenceException e) {
                            PublishPollServiceImpl.log.error("Payload reference error for publish on topic: " + build.getTopic(), e);
                            if (build.getQoS().getQosNumber() > 0) {
                                PublishPollServiceImpl.this.removeMessageFromSharedQueue(str2, build.getUniqueId());
                            }
                            inFlightMessageCount.decrementAndGet();
                            PublishPollServiceImpl.this.messageDroppedService.failed(str, build.getTopic(), build.getQoS().getQosNumber());
                        }
                    } catch (NoMessageIdAvailableException e2) {
                        PublishPollServiceImpl.log.error("No message id available for client: " + str + ", shared subscription " + str2, e2);
                        PublishPollServiceImpl.this.messageDroppedService.queueFullShared(str2, publish.getTopic(), publish.getQoS().getQosNumber());
                        inFlightMessageCount.decrementAndGet();
                        return;
                    }
                }
                of.mo1getPublishFlushHandler().sendPublishes(arrayList);
            }

            public void onFailure(@NotNull Throwable th) {
                Exceptions.rethrowError("Exception in shared publishes poll handling for client " + str + "for shared subscription " + str2, th);
            }
        }, this.singleWriterService.callbackExecutor(str));
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    @NotNull
    public ListenableFuture<Void> removeMessageFromQueue(@NotNull String str, int i) {
        return this.clientQueuePersistence.remove(str, i);
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    @NotNull
    public ListenableFuture<Void> removeMessageFromSharedQueue(@NotNull String str, @NotNull String str2) {
        return this.clientQueuePersistence.removeShared(str, str2);
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    @NotNull
    public ListenableFuture<Void> putPubrelInQueue(@NotNull String str, int i) {
        return this.clientQueuePersistence.putPubrel(str, i);
    }

    @Override // com.hivemq.mqtt.services.PublishPollService
    @NotNull
    public ListenableFuture<Void> removeInflightMarker(@NotNull String str, @NotNull String str2) {
        return this.clientQueuePersistence.removeInFlightMarker(str, str2);
    }

    @NotNull
    private ImmutableIntArray createMessageIds(@NotNull MessageIDPool messageIDPool, int i) throws NoMessageIdAvailableException {
        ImmutableIntArray.Builder builder = ImmutableIntArray.builder(i);
        for (int i2 = 0; i2 < i; i2++) {
            builder.add(messageIDPool.takeNextId());
        }
        return builder.build();
    }

    private int pollMessageLimit(@NotNull Channel channel) {
        ClientConnection of = ClientConnection.of(channel);
        return Math.max(InternalConfigurations.PUBLISH_POLL_BATCH_SIZE, of == null ? InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES : of.getMaxInflightWindow(InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES));
    }
}
