package com.hivemq.mqtt.handler.subscribe;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.configuration.service.RestrictionsConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.packets.auth.DefaultAuthorizationBehaviour;
import com.hivemq.extension.sdk.api.packets.auth.ModifiableDefaultPermissions;
import com.hivemq.extensions.packets.general.ModifiableDefaultPermissionsImpl;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.handler.publish.DefaultPermissionsEvaluator;
import com.hivemq.mqtt.handler.subscribe.retained.RetainedMessagesSender;
import com.hivemq.mqtt.handler.subscribe.retained.SendRetainedMessagesListener;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5SubAckReasonCode;
import com.hivemq.mqtt.message.suback.SUBACK;
import com.hivemq.mqtt.message.subscribe.SUBSCRIBE;
import com.hivemq.mqtt.message.subscribe.Topic;
import com.hivemq.persistence.clientsession.ClientSessionSubscriptionPersistence;
import com.hivemq.persistence.clientsession.SharedSubscriptionService;
import com.hivemq.persistence.clientsession.callback.SubscriptionResult;
import com.hivemq.persistence.retained.RetainedMessagePersistence;
import com.hivemq.persistence.util.FutureUtils;
import com.hivemq.security.auth.ClientData;
import com.hivemq.security.auth.ClientToken;
import com.hivemq.util.ChannelAttributes;
import com.hivemq.util.ChannelUtils;
import com.hivemq.util.Exceptions;
import com.hivemq.util.ReasonStrings;
import com.hivemq.util.Topics;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/IncomingSubscribeService.class */
public class IncomingSubscribeService {

    @NotNull
    private static final Logger log = LoggerFactory.getLogger(IncomingSubscribeService.class);

    @NotNull
    private static final Comparator<Topic> TOPIC_AND_QOS_COMPARATOR = new Comparator<Topic>() { // from class: com.hivemq.mqtt.handler.subscribe.IncomingSubscribeService.1
        @Override // java.util.Comparator
        public int compare(Topic topic, Topic topic2) {
            int compareTo = topic.getTopic().compareTo(topic2.getTopic());
            return compareTo == 0 ? topic.getQoS().getQosNumber() - topic2.getQoS().getQosNumber() : compareTo;
        }
    };

    @NotNull
    private final ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence;

    @NotNull
    private final RetainedMessagePersistence retainedMessagePersistence;

    @NotNull
    private final SharedSubscriptionService sharedSubscriptionService;

    @NotNull
    private final RetainedMessagesSender retainedMessagesSender;

    @NotNull
    private final MqttConfigurationService mqttConfigurationService;

    @NotNull
    private final RestrictionsConfigurationService restrictionsConfigurationService;

    @NotNull
    private final MqttServerDisconnector mqttServerDisconnector;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/IncomingSubscribeService$SubscribePersistenceBatchedCallback.class */
    public static class SubscribePersistenceBatchedCallback implements FutureCallback<ImmutableList<SubscriptionResult>> {

        @NotNull
        private final SettableFuture<ImmutableList<SubscriptionResult>> settableFuture;

        @NotNull
        private final String clientId;

        @NotNull
        private final SUBSCRIBE msg;

        @NotNull
        private final ProtocolVersion mqttVersion;

        @NotNull
        private final Mqtt5SubAckReasonCode[] answerCodes;

        public SubscribePersistenceBatchedCallback(@NotNull SettableFuture<ImmutableList<SubscriptionResult>> settableFuture, @NotNull String str, @NotNull SUBSCRIBE subscribe, @NotNull ProtocolVersion protocolVersion, @NotNull Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr) {
            this.settableFuture = settableFuture;
            this.clientId = str;
            this.msg = subscribe;
            this.mqttVersion = protocolVersion;
            this.answerCodes = mqtt5SubAckReasonCodeArr;
        }

        public void onSuccess(@Nullable ImmutableList<SubscriptionResult> immutableList) {
            this.settableFuture.set(immutableList);
            IncomingSubscribeService.log.trace("Adding subscriptions for client [{}] and topics [{}]", this.clientId, this.msg.getTopics());
        }

        public void onFailure(@NotNull Throwable th) {
            if (this.mqttVersion != ProtocolVersion.MQTTv3_1_1) {
                this.settableFuture.setException(th);
                return;
            }
            Exceptions.rethrowError("Unable to persist subscription to topics " + this.msg.getTopics() + " for client " + this.clientId + ".", th);
            for (int i = 0; i < this.answerCodes.length; i++) {
                this.answerCodes[i] = Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR;
            }
            this.settableFuture.set((Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hivemq/mqtt/handler/subscribe/IncomingSubscribeService$SubscribePersistenceCallback.class */
    public static class SubscribePersistenceCallback implements FutureCallback<SubscriptionResult> {

        @NotNull
        private final SettableFuture<SubscriptionResult> settableFuture;

        @NotNull
        private final String clientId;

        @NotNull
        private final Topic topic;

        @NotNull
        private final ProtocolVersion mqttVersion;

        @NotNull
        private final Mqtt5SubAckReasonCode[] answerCodes;
        private final int index;

        public SubscribePersistenceCallback(@NotNull SettableFuture<SubscriptionResult> settableFuture, @NotNull String str, @NotNull Topic topic, @NotNull ProtocolVersion protocolVersion, @NotNull Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr, int i) {
            this.settableFuture = settableFuture;
            this.clientId = str;
            this.topic = topic;
            this.mqttVersion = protocolVersion;
            this.answerCodes = mqtt5SubAckReasonCodeArr;
            this.index = i;
        }

        public void onSuccess(@Nullable SubscriptionResult subscriptionResult) {
            this.settableFuture.set(subscriptionResult);
            IncomingSubscribeService.log.trace("Adding subscriptions for client [{}] and topic [{}] with qos [{}]", new Object[]{this.clientId, this.topic.getTopic(), this.topic.getQoS()});
        }

        public void onFailure(@NotNull Throwable th) {
            if (this.mqttVersion != ProtocolVersion.MQTTv3_1_1) {
                this.settableFuture.setException(th);
                return;
            }
            Exceptions.rethrowError("Unable to persist subscription to topic " + this.topic + " for client " + this.clientId + ".", th);
            this.answerCodes[this.index] = Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR;
            this.settableFuture.set((Object) null);
        }
    }

    @Inject
    IncomingSubscribeService(@NotNull ClientSessionSubscriptionPersistence clientSessionSubscriptionPersistence, @NotNull RetainedMessagePersistence retainedMessagePersistence, @NotNull SharedSubscriptionService sharedSubscriptionService, @NotNull RetainedMessagesSender retainedMessagesSender, @NotNull MqttConfigurationService mqttConfigurationService, @NotNull RestrictionsConfigurationService restrictionsConfigurationService, @NotNull MqttServerDisconnector mqttServerDisconnector) {
        this.clientSessionSubscriptionPersistence = clientSessionSubscriptionPersistence;
        this.retainedMessagePersistence = retainedMessagePersistence;
        this.sharedSubscriptionService = sharedSubscriptionService;
        this.retainedMessagesSender = retainedMessagesSender;
        this.mqttConfigurationService = mqttConfigurationService;
        this.restrictionsConfigurationService = restrictionsConfigurationService;
        this.mqttServerDisconnector = mqttServerDisconnector;
    }

    public void processSubscribe(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull SUBSCRIBE subscribe, boolean z) {
        processSubscribe(channelHandlerContext, subscribe, new Mqtt5SubAckReasonCode[subscribe.getTopics().size()], new String[subscribe.getTopics().size()], z);
    }

    public void processSubscribe(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull SUBSCRIBE subscribe, @NotNull Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr, @NotNull String[] strArr, boolean z) {
        if (hasOnlyValidSubscriptions(channelHandlerContext, subscribe)) {
            authorizeSubscriptions(channelHandlerContext, ChannelUtils.tokenFromChannel(channelHandlerContext.channel()), subscribe, mqtt5SubAckReasonCodeArr, strArr, z);
        }
    }

    private void authorizeSubscriptions(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull ClientToken clientToken, @NotNull SUBSCRIBE subscribe, @NotNull Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr, @NotNull String[] strArr, boolean z) {
        StringBuilder sb = new StringBuilder();
        ModifiableDefaultPermissions authPermissions = ((ClientConnection) channelHandlerContext.channel().attr(ChannelAttributes.CLIENT_CONNECTION).get()).getAuthPermissions();
        ModifiableDefaultPermissionsImpl modifiableDefaultPermissionsImpl = (ModifiableDefaultPermissionsImpl) authPermissions;
        for (int i = 0; i < subscribe.getTopics().size(); i++) {
            if (mqtt5SubAckReasonCodeArr[i] == null) {
                if (!z || (modifiableDefaultPermissionsImpl != null && (modifiableDefaultPermissionsImpl.asList().size() >= 1 || modifiableDefaultPermissionsImpl.getDefaultBehaviour() != DefaultAuthorizationBehaviour.ALLOW || modifiableDefaultPermissionsImpl.isDefaultAuthorizationBehaviourOverridden()))) {
                    Topic topic = (Topic) subscribe.getTopics().get(i);
                    if (!DefaultPermissionsEvaluator.checkSubscription(authPermissions, topic)) {
                        mqtt5SubAckReasonCodeArr[i] = Mqtt5SubAckReasonCode.NOT_AUTHORIZED;
                        strArr[i] = "Not authorized to subscribe to topic '" + topic.getTopic() + "' with QoS '" + topic.getQoS().getQosNumber() + "'";
                    }
                } else {
                    mqtt5SubAckReasonCodeArr[i] = Mqtt5SubAckReasonCode.NOT_AUTHORIZED;
                }
            }
        }
        for (String str : strArr) {
            if (str != null && !str.isEmpty()) {
                sb.append(str).append(". ");
            }
        }
        persistSubscriptionForClient(channelHandlerContext, clientToken, subscribe, mqtt5SubAckReasonCodeArr, sb.length() > 0 ? sb.toString() : null);
    }

    private boolean hasOnlyValidSubscriptions(ChannelHandlerContext channelHandlerContext, SUBSCRIBE subscribe) {
        log.trace("Checking SUBSCRIBE message of client '{}' if topics are valid", ChannelUtils.getClientId(channelHandlerContext.channel()));
        int maxTopicLength = this.restrictionsConfigurationService.maxTopicLength();
        UnmodifiableIterator it = subscribe.getTopics().iterator();
        while (it.hasNext()) {
            Topic topic = (Topic) it.next();
            String topic2 = topic.getTopic();
            if (!Topics.isValidToSubscribe(topic2)) {
                this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Disconnecting client '" + ChannelUtils.getClientId(channelHandlerContext.channel()) + "'  (IP: {}) because it sent an invalid subscription: '" + topic.getTopic() + "'", "Invalid subscription topic " + topic.getTopic(), Mqtt5DisconnectReasonCode.TOPIC_FILTER_INVALID, ReasonStrings.DISCONNECT_SUBSCRIBE_TOPIC_FILTER_INVALID);
                return false;
            }
            if (topic2.length() > maxTopicLength) {
                this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Disconnecting client '" + ChannelUtils.getClientId(channelHandlerContext.channel()) + "'  (IP: {}) because it sent a subscription to a topic exceeding the maximum topic length: '" + topic.getTopic() + "'", "Sent SUBSCRIBE for topic that exceeds maximum topic length", Mqtt5DisconnectReasonCode.TOPIC_FILTER_INVALID, ReasonStrings.DISCONNECT_SUBSCRIBE_TOPIC_FILTER_INVALID);
                return false;
            }
        }
        return true;
    }

    private void persistSubscriptionForClient(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull ClientData clientData, @NotNull SUBSCRIBE subscribe, @Nullable Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr, @Nullable String str) {
        ClientConnection clientConnection = (ClientConnection) channelHandlerContext.channel().attr(ChannelAttributes.CLIENT_CONNECTION).get();
        String clientId = clientConnection.getClientId();
        downgradeSharedSubscriptions(subscribe);
        ProtocolVersion protocolVersion = clientConnection.getProtocolVersion();
        Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr2 = mqtt5SubAckReasonCodeArr != null ? mqtt5SubAckReasonCodeArr : new Mqtt5SubAckReasonCode[subscribe.getTopics().size()];
        ImmutableList.Builder builder = ImmutableList.builder();
        int i = 0;
        Set<Topic> hashSet = new HashSet<>();
        for (int i2 = 0; i2 < subscribe.getTopics().size(); i2++) {
            Topic topic = (Topic) subscribe.getTopics().get(i2);
            if (mqtt5SubAckReasonCodeArr2[i2] == null || mqtt5SubAckReasonCodeArr2[i2].getCode() < 128) {
                if (!this.mqttConfigurationService.wildcardSubscriptionsEnabled() && Topics.containsWildcard(topic.getTopic())) {
                    this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + clientId + "' (IP: {}) sent a SUBSCRIBE with a wildcard character in the topic filter '" + topic.getTopic() + "', although wildcard subscriptions are disabled. Disconnecting client.", "Sent a SUBSCRIBE with a wildcard character in the topic filter '" + topic.getTopic() + "', although wildcard subscriptions are disabled", Mqtt5DisconnectReasonCode.WILDCARD_SUBSCRIPTION_NOT_SUPPORTED, ReasonStrings.DISCONNECT_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED);
                    return;
                } else if (!this.mqttConfigurationService.sharedSubscriptionsEnabled() && Topics.isSharedSubscriptionTopic(topic.getTopic())) {
                    this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + clientId + "' (IP: {}) sent a SUBSCRIBE, which matches a shared subscription '" + topic.getTopic() + "', although shared subscriptions are disabled. Disconnecting client.", "Sent a SUBSCRIBE, which matches a shared subscription '" + topic.getTopic() + "', although shared subscriptions are disabled", Mqtt5DisconnectReasonCode.SHARED_SUBSCRIPTION_NOT_SUPPORTED, ReasonStrings.DISCONNECT_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED);
                    return;
                } else {
                    Mqtt5SubAckReasonCode fromCode = Mqtt5SubAckReasonCode.fromCode(topic.getQoS().getQosNumber());
                    mqtt5SubAckReasonCodeArr2[i2] = fromCode != null ? fromCode : Mqtt5SubAckReasonCode.UNSPECIFIED_ERROR;
                }
            }
            if (mqtt5SubAckReasonCodeArr2[i2].getCode() >= 128) {
                if (protocolVersion == ProtocolVersion.MQTTv3_1) {
                    handleInsufficientPermissionsV31(channelHandlerContext, clientData, topic);
                    return;
                } else {
                    hashSet.add(topic);
                    log.trace("Ignoring subscription for client [{}] and topic [{}] with qos [{}] because the client is not permitted", new Object[]{clientId, topic.getTopic(), topic.getQoS()});
                }
            }
        }
        Set<Topic> cleanedSubscriptions = getCleanedSubscriptions(subscribe);
        TreeSet newTreeSet = Sets.newTreeSet(TOPIC_AND_QOS_COMPARATOR);
        newTreeSet.addAll(cleanedSubscriptions);
        ListenableFuture<ImmutableList<SubscriptionResult>> listenableFuture = null;
        if (batch(cleanedSubscriptions)) {
            cleanedSubscriptions.removeAll(hashSet);
            listenableFuture = persistBatchedSubscriptions(clientId, subscribe, cleanedSubscriptions, protocolVersion, mqtt5SubAckReasonCodeArr2);
            i = 0 + 1;
        } else {
            for (int i3 = 0; i3 < subscribe.getTopics().size(); i3++) {
                Topic topic2 = (Topic) subscribe.getTopics().get(i3);
                if (!hashSet.contains(topic2) && newTreeSet.contains(topic2)) {
                    mqtt5SubAckReasonCodeArr2[i3] = Mqtt5SubAckReasonCode.fromCode(topic2.getQoS().getQosNumber());
                    SettableFuture create = SettableFuture.create();
                    builder.add(create);
                    i++;
                    FutureUtils.addPersistenceCallback(this.clientSessionSubscriptionPersistence.addSubscription(clientId, topic2), new SubscribePersistenceCallback(create, clientId, topic2, protocolVersion, mqtt5SubAckReasonCodeArr2, i3));
                }
            }
        }
        log.trace("Applied all subscriptions for client [{}]", clientId);
        if (i == 0) {
            channelHandlerContext.channel().writeAndFlush(new SUBACK(subscribe.getPacketIdentifier(), ImmutableList.copyOf(mqtt5SubAckReasonCodeArr2), str));
            return;
        }
        SettableFuture<List<SubscriptionResult>> create2 = SettableFuture.create();
        if (listenableFuture != null) {
            create2.setFuture(listenableFuture);
        } else {
            create2.setFuture(Futures.allAsList(builder.build()));
        }
        sendSubackAndRetainedMessages(channelHandlerContext, subscribe, mqtt5SubAckReasonCodeArr2, create2, hashSet, str);
    }

    private void sendSubackAndRetainedMessages(final ChannelHandlerContext channelHandlerContext, @NotNull final SUBSCRIBE subscribe, @NotNull final Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr, @NotNull SettableFuture<List<SubscriptionResult>> settableFuture, @NotNull final Set<Topic> set, @Nullable final String str) {
        Futures.addCallback(settableFuture, new FutureCallback<List<SubscriptionResult>>() { // from class: com.hivemq.mqtt.handler.subscribe.IncomingSubscribeService.2
            public void onSuccess(@Nullable List<SubscriptionResult> list) {
                ChannelFuture writeAndFlush = channelHandlerContext.channel().writeAndFlush(new SUBACK(subscribe.getPacketIdentifier(), ImmutableList.copyOf(mqtt5SubAckReasonCodeArr), str));
                if (list != null) {
                    writeAndFlush.addListener(new SendRetainedMessagesListener(list, set, IncomingSubscribeService.this.retainedMessagePersistence, IncomingSubscribeService.this.retainedMessagesSender));
                }
            }

            public void onFailure(@NotNull Throwable th) {
                channelHandlerContext.channel().disconnect();
            }
        }, channelHandlerContext.executor());
    }

    @NotNull
    private Set<Topic> getCleanedSubscriptions(SUBSCRIBE subscribe) {
        ImmutableList<Topic> topics = subscribe.getTopics();
        int size = topics.size();
        if (size < 2) {
            return Sets.newHashSet(topics);
        }
        HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(size);
        for (Topic topic : topics) {
            if (!newHashSetWithExpectedSize.add(topic)) {
                newHashSetWithExpectedSize.remove(topic);
                newHashSetWithExpectedSize.add(topic);
            }
        }
        return newHashSetWithExpectedSize;
    }

    @VisibleForTesting
    boolean batch(@NotNull Set<Topic> set) {
        return set.size() >= 2;
    }

    @NotNull
    private ListenableFuture<ImmutableList<SubscriptionResult>> persistBatchedSubscriptions(@NotNull String str, @NotNull SUBSCRIBE subscribe, @NotNull Set<Topic> set, @NotNull ProtocolVersion protocolVersion, @NotNull Mqtt5SubAckReasonCode[] mqtt5SubAckReasonCodeArr) {
        SettableFuture create = SettableFuture.create();
        FutureUtils.addPersistenceCallback(this.clientSessionSubscriptionPersistence.addSubscriptions(str, ImmutableSet.copyOf(set)), new SubscribePersistenceBatchedCallback(create, str, subscribe, protocolVersion, mqtt5SubAckReasonCodeArr));
        return create;
    }

    private void handleInsufficientPermissionsV31(ChannelHandlerContext channelHandlerContext, ClientData clientData, Topic topic) {
        log.debug("MQTT v3.1 Client '{}' (IP: {}) is not authorized to subscribe to topic '{}' with QoS '{}'. Disconnecting client.", new Object[]{clientData.getClientId(), ChannelUtils.getChannelIP(channelHandlerContext.channel()).orElse("UNKNOWN"), topic.getTopic(), Integer.valueOf(topic.getQoS().getQosNumber())});
        this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), null, "Not authorized to subscribe to topic '" + topic.getTopic() + "', QoS '" + topic.getQoS() + "'", Mqtt5DisconnectReasonCode.NOT_AUTHORIZED, null);
    }

    private void downgradeSharedSubscriptions(@NotNull SUBSCRIBE subscribe) {
        UnmodifiableIterator it = subscribe.getTopics().iterator();
        while (it.hasNext()) {
            Topic topic = (Topic) it.next();
            if (this.sharedSubscriptionService.checkForSharedSubscription(topic.getTopic()) != null && topic.getQoS().getQosNumber() > 1) {
                topic.setQoS(QoS.AT_LEAST_ONCE);
            }
        }
    }
}
