package com.github.netty.protocol.mqtt;

import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.MqttSession;
import com.github.netty.protocol.mqtt.interception.BrokerInterceptor;
import com.github.netty.protocol.mqtt.security.IAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.subscriptions.ISubscriptionsDirectory;
import com.github.netty.protocol.mqtt.subscriptions.Subscription;
import com.github.netty.protocol.mqtt.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/netty/protocol/mqtt/MqttPostOffice.class */
public class MqttPostOffice {
    private static final LoggerX LOG = LoggerFactoryX.getLogger(MqttPostOffice.class);
    private final IAuthorizatorPolicy authorizatorPolicy;
    private final ISubscriptionsDirectory subscriptions;
    private final IRetainedRepository retainedRepository;
    private MqttSessionRegistry sessionRegistry;
    private BrokerInterceptor interceptor;

    public MqttPostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IAuthorizatorPolicy iAuthorizatorPolicy, IRetainedRepository iRetainedRepository, MqttSessionRegistry mqttSessionRegistry, BrokerInterceptor brokerInterceptor) {
        this.authorizatorPolicy = iAuthorizatorPolicy;
        this.subscriptions = iSubscriptionsDirectory;
        this.retainedRepository = iRetainedRepository;
        this.sessionRegistry = mqttSessionRegistry;
        this.interceptor = brokerInterceptor;
    }

    public void fireWill(MqttSession.Will will) {
        publish2Subscribers(will.payload, new Topic(will.topic), will.qos);
    }

    public void subscribeClientToTopics(MqttSubscribeMessage mqttSubscribeMessage, String str, String str2, MqttConnection mqttConnection) {
        int messageId = MqttUtil.messageId(mqttSubscribeMessage);
        List<MqttTopicSubscription> verifyTopicsReadAccess = verifyTopicsReadAccess(str, str2, mqttSubscribeMessage);
        MqttSubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(verifyTopicsReadAccess, messageId);
        List<Subscription> list = (List) verifyTopicsReadAccess.stream().filter(mqttTopicSubscription -> {
            return mqttTopicSubscription.qualityOfService() != MqttQoS.FAILURE;
        }).map(mqttTopicSubscription2 -> {
            return new Subscription(str, new Topic(mqttTopicSubscription2.topicName()), mqttTopicSubscription2.qualityOfService());
        }).collect(Collectors.toList());
        Iterator<Subscription> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptions.add(it.next());
        }
        this.sessionRegistry.retrieve(str).addSubscriptions(list);
        mqttConnection.sendSubAckMessage(messageId, doAckMessageFromValidateFilters);
        publishRetainedMessagesForSubscriptions(str, list);
        Iterator<Subscription> it2 = list.iterator();
        while (it2.hasNext()) {
            this.interceptor.notifyTopicSubscribed(it2.next(), str2);
        }
    }

    List<MqttTopicSubscription> verifyTopicsReadAccess(String str, String str2, MqttSubscribeMessage mqttSubscribeMessage) {
        MqttQoS mqttQoS;
        ArrayList arrayList = new ArrayList();
        int messageId = MqttUtil.messageId(mqttSubscribeMessage);
        for (MqttTopicSubscription mqttTopicSubscription : mqttSubscribeMessage.payload().topicSubscriptions()) {
            Topic topic = new Topic(mqttTopicSubscription.topicName());
            if (this.authorizatorPolicy.canRead(topic, str2, str)) {
                if (topic.isValid()) {
                    LOG.debug("Client will be subscribed to the topic CId={}, username: {}, messageId: {}, topic: {}", str, str2, Integer.valueOf(messageId), topic);
                    mqttQoS = mqttTopicSubscription.qualityOfService();
                } else {
                    LOG.warn("Topic filter is not valid CId={}, username: {}, messageId: {}, topic: {}", str, str2, Integer.valueOf(messageId), topic);
                    mqttQoS = MqttQoS.FAILURE;
                }
                arrayList.add(new MqttTopicSubscription(topic.toString(), mqttQoS));
            } else {
                LOG.warn("Client does not have read permissions on the topic CId={}, username: {}, messageId: {}, topic: {}", str, str2, Integer.valueOf(messageId), topic);
                arrayList.add(new MqttTopicSubscription(topic.toString(), MqttQoS.FAILURE));
            }
        }
        return arrayList;
    }

    private void publishRetainedMessagesForSubscriptions(String str, List<Subscription> list) {
        MqttSession retrieve = this.sessionRegistry.retrieve(str);
        for (Subscription subscription : list) {
            List<MqttRetainedMessage> retainedOnTopic = this.retainedRepository.retainedOnTopic(subscription.getTopicFilter().toString());
            if (!retainedOnTopic.isEmpty()) {
                for (MqttRetainedMessage mqttRetainedMessage : retainedOnTopic) {
                    retrieve.sendRetainedPublishOnSessionAtQos(subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired(subscription, mqttRetainedMessage.qosLevel()), Unpooled.wrappedBuffer(mqttRetainedMessage.getPayload()));
                }
            }
        }
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().qualityOfService().value()));
        }
        return new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList));
    }

    public void unsubscribe(List<String> list, MqttConnection mqttConnection, int i) {
        String clientId = mqttConnection.getClientId();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Topic topic = new Topic(it.next());
            if (!topic.isValid()) {
                mqttConnection.dropConnection();
                LOG.warn("Topic filter is not valid. CId={}, topics: {}, offending topic filter: {}", clientId, list, topic);
                return;
            } else {
                LOG.trace("Removing subscription. CId={}, topic={}", clientId, topic);
                this.subscriptions.removeSubscription(topic, clientId);
                this.interceptor.notifyTopicUnsubscribed(topic.toString(), clientId, MqttUtil.userName(mqttConnection.channel));
            }
        }
        mqttConnection.sendUnsubAckMessage(list, clientId, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos0(Topic topic, String str, String str2, ByteBuf byteBuf, boolean z, MqttPublishMessage mqttPublishMessage) {
        if (!this.authorizatorPolicy.canWrite(topic, str, str2)) {
            LOG.error("MQTT client: {} is not authorized to publish on topic: {}", str2, topic);
            return;
        }
        publish2Subscribers(byteBuf, topic, MqttQoS.AT_MOST_ONCE);
        if (z) {
            this.retainedRepository.cleanRetained(topic);
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, str2, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos1(MqttConnection mqttConnection, Topic topic, String str, ByteBuf byteBuf, int i, boolean z, MqttPublishMessage mqttPublishMessage) {
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            mqttConnection.dropConnection();
            return;
        }
        String clientId = mqttConnection.getClientId();
        if (!this.authorizatorPolicy.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client: {} is not authorized to publish on topic: {}", clientId, topic);
            return;
        }
        publish2Subscribers(byteBuf, topic, MqttQoS.AT_LEAST_ONCE);
        mqttConnection.sendPubAck(i);
        if (z) {
            if (byteBuf.isReadable()) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            } else {
                this.retainedRepository.cleanRetained(topic);
            }
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
    }

    private void publish2Subscribers(ByteBuf byteBuf, Topic topic, MqttQoS mqttQoS) {
        for (Subscription subscription : this.subscriptions.matchQosSharpening(topic)) {
            MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, mqttQoS);
            MqttSession retrieve = this.sessionRegistry.retrieve(subscription.getClientId());
            if (retrieve != null) {
                LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired);
                if (!this.authorizatorPolicy.canRead(topic, "TODO", subscription.getClientId())) {
                    LOG.debug("Authorizator prohibit Client {} to be notified on {}", subscription.getClientId(), topic);
                    return;
                }
                retrieve.sendPublishOnSessionAtQos(topic, lowerQosToTheSubscriptionDesired, byteBuf.retainedDuplicate());
            } else {
                LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos2(MqttConnection mqttConnection, MqttPublishMessage mqttPublishMessage, String str) {
        LOG.trace("Processing PUBREL message on connection: {}", mqttConnection);
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        String clientId = mqttConnection.getClientId();
        if (!this.authorizatorPolicy.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic: {}", clientId, topic);
            return;
        }
        publish2Subscribers(payload, topic, MqttQoS.EXACTLY_ONCE);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (payload.isReadable()) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            } else {
                this.retainedRepository.cleanRetained(topic);
            }
        }
        this.interceptor.notifyTopicPublished(mqttPublishMessage, mqttConnection.getClientId(), str);
    }

    static MqttQoS lowerQosToTheSubscriptionDesired(Subscription subscription, MqttQoS mqttQoS) {
        if (mqttQoS.value() > subscription.getRequestedQos().value()) {
            mqttQoS = subscription.getRequestedQos();
        }
        return mqttQoS;
    }

    public void internalPublish(MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qosLevel);
        publish2Subscribers(payload, topic, qosLevel);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (qosLevel == MqttQoS.AT_MOST_ONCE || mqttPublishMessage.payload().readableBytes() == 0) {
                this.retainedRepository.cleanRetained(topic);
            } else {
                this.retainedRepository.retain(topic, mqttPublishMessage);
            }
        }
    }
}
