package com.hivemq.mqtt.handler.publish;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.configuration.service.MqttConfigurationService;
import com.hivemq.configuration.service.RestrictionsConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.packets.auth.DefaultAuthorizationBehaviour;
import com.hivemq.extension.sdk.api.packets.auth.ModifiableDefaultPermissions;
import com.hivemq.extension.sdk.api.packets.publish.AckReasonCode;
import com.hivemq.extensions.handler.tasks.PublishAuthorizerResult;
import com.hivemq.extensions.packets.general.ModifiableDefaultPermissionsImpl;
import com.hivemq.mqtt.handler.disconnect.MqttServerDisconnector;
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.pubrec.PUBREC;
import com.hivemq.mqtt.message.reason.Mqtt5DisconnectReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5PubAckReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
import com.hivemq.mqtt.services.InternalPublishService;
import com.hivemq.util.ReasonStrings;
import io.netty.channel.ChannelHandlerContext;
import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
/* loaded from: input_file:com/hivemq/mqtt/handler/publish/IncomingPublishService.class */
public class IncomingPublishService {

    @NotNull
    private final InternalPublishService publishService;

    @NotNull
    private final MqttConfigurationService mqttConfigurationService;

    @NotNull
    private final RestrictionsConfigurationService restrictionsConfigurationService;

    @NotNull
    private final MqttServerDisconnector mqttServerDisconnector;

    @Inject
    IncomingPublishService(@NotNull InternalPublishService internalPublishService, @NotNull MqttConfigurationService mqttConfigurationService, @NotNull RestrictionsConfigurationService restrictionsConfigurationService, @NotNull MqttServerDisconnector mqttServerDisconnector) {
        this.publishService = internalPublishService;
        this.mqttConfigurationService = mqttConfigurationService;
        this.restrictionsConfigurationService = restrictionsConfigurationService;
        this.mqttServerDisconnector = mqttServerDisconnector;
    }

    public void processPublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @Nullable PublishAuthorizerResult publishAuthorizerResult) {
        ClientConnection of = ClientConnection.of(channelHandlerContext.channel());
        ProtocolVersion protocolVersion = of.getProtocolVersion();
        int qosNumber = this.mqttConfigurationService.maximumQos().getQosNumber();
        int qosNumber2 = publish.getQoS().getQosNumber();
        if (qosNumber2 > qosNumber) {
            this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + of.getClientId() + "' (IP: {}) sent a PUBLISH with QoS exceeding the maximum configured QoS. Got QoS " + publish.getQoS() + ", maximum: " + this.mqttConfigurationService.maximumQos() + ". Disconnecting client.", "Sent PUBLISH with QoS (" + qosNumber2 + ") higher than the allowed maximum (" + qosNumber + ")", Mqtt5DisconnectReasonCode.QOS_NOT_SUPPORTED, String.format(ReasonStrings.CONNACK_QOS_NOT_SUPPORTED_PUBLISH, Integer.valueOf(qosNumber2), Integer.valueOf(qosNumber)));
            return;
        }
        String topic = publish.getTopic();
        int maxTopicLength = this.restrictionsConfigurationService.maxTopicLength();
        if (topic.length() > maxTopicLength) {
            this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + of.getClientId() + "' (IP: {}) sent a PUBLISH with a topic that exceeds the maximum configured length of '" + maxTopicLength + "' . Disconnecting client.", "Sent PUBLISH for a topic that exceeds maximum topic length", Mqtt5DisconnectReasonCode.TOPIC_NAME_INVALID, ReasonStrings.DISCONNECT_MAXIMUM_TOPIC_LENGTH_EXCEEDED);
            return;
        }
        if (ProtocolVersion.MQTTv3_1 == protocolVersion || ProtocolVersion.MQTTv3_1_1 == protocolVersion) {
            Long maxPacketSizeSend = of.getMaxPacketSizeSend();
            if (!isMessageSizeAllowed(maxPacketSizeSend, publish)) {
                this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + of.getClientId() + "' (IP: {}) sent a PUBLISH with " + publish.getPayload().length + " bytes payload its max allowed size is " + maxPacketSizeSend + " bytes. Disconnecting client.", "Sent PUBLISH with a payload that is bigger than the allowed message size", Mqtt5DisconnectReasonCode.PACKET_TOO_LARGE, "Sent PUBLISH with a payload that is bigger than the allowed message size");
                return;
            }
        }
        authorizePublish(channelHandlerContext, publish, publishAuthorizerResult);
    }

    private void authorizePublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @Nullable PublishAuthorizerResult publishAuthorizerResult) {
        ClientConnection of = ClientConnection.of(channelHandlerContext.channel());
        if (publishAuthorizerResult != null && publishAuthorizerResult.getAckReasonCode() != null) {
            if (of.isIncomingPublishesDefaultFailedSkipRest()) {
                finishUnauthorizedPublish(channelHandlerContext, publish, null, null);
                return;
            } else if (publishAuthorizerResult.getAckReasonCode() == AckReasonCode.SUCCESS) {
                publishMessage(channelHandlerContext, publish);
                return;
            } else {
                finishUnauthorizedPublish(channelHandlerContext, publish, publishAuthorizerResult.getAckReasonCode(), publishAuthorizerResult.getReasonString());
                return;
            }
        }
        ModifiableDefaultPermissions authPermissions = of.getAuthPermissions();
        ModifiableDefaultPermissionsImpl modifiableDefaultPermissionsImpl = (ModifiableDefaultPermissionsImpl) authPermissions;
        if (publishAuthorizerResult != null && publishAuthorizerResult.isAuthorizerPresent() && (modifiableDefaultPermissionsImpl == null || (modifiableDefaultPermissionsImpl.asList().size() < 1 && modifiableDefaultPermissionsImpl.getDefaultBehaviour() == DefaultAuthorizationBehaviour.ALLOW && !modifiableDefaultPermissionsImpl.isDefaultAuthorizationBehaviourOverridden()))) {
            finishUnauthorizedPublish(channelHandlerContext, publish, null, null);
        } else if (DefaultPermissionsEvaluator.checkPublish(authPermissions, publish)) {
            publishMessage(channelHandlerContext, publish);
        } else {
            finishUnauthorizedPublish(channelHandlerContext, publish, null, null);
        }
    }

    private void finishUnauthorizedPublish(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull PUBLISH publish, @Nullable AckReasonCode ackReasonCode, @Nullable String str) {
        ClientConnection of = ClientConnection.of(channelHandlerContext.channel());
        of.setIncomingPublishesDefaultFailedSkipRest(true);
        if (channelHandlerContext.channel().isActive()) {
            String str2 = "Not authorized to publish on topic '" + publish.getTopic() + "' with QoS '" + publish.getQoS().getQosNumber() + "' and retain '" + publish.isRetain() + "'";
            if (of.getProtocolVersion() != ProtocolVersion.MQTTv5) {
                this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + of.getClientId() + "' (IP: {}) is not authorized to publish on topic '" + publish.getTopic() + "' with QoS '" + publish.getQoS().getQosNumber() + "' and retain '" + publish.isRetain() + "'. Disconnecting client.", str2, Mqtt5DisconnectReasonCode.NOT_AUTHORIZED, str2);
                return;
            }
            switch (publish.getQoS()) {
                case AT_LEAST_ONCE:
                    channelHandlerContext.pipeline().writeAndFlush(new PUBACK(publish.getPacketIdentifier(), ackReasonCode != null ? Mqtt5PubAckReasonCode.from(ackReasonCode) : Mqtt5PubAckReasonCode.NOT_AUTHORIZED, str != null ? str : str2, Mqtt5UserProperties.NO_USER_PROPERTIES));
                    break;
                case EXACTLY_ONCE:
                    channelHandlerContext.pipeline().writeAndFlush(new PUBREC(publish.getPacketIdentifier(), ackReasonCode != null ? Mqtt5PubRecReasonCode.from(ackReasonCode) : Mqtt5PubRecReasonCode.NOT_AUTHORIZED, str != null ? str : str2, Mqtt5UserProperties.NO_USER_PROPERTIES));
                    break;
            }
            this.mqttServerDisconnector.disconnect(channelHandlerContext.channel(), "Client '" + of.getClientId() + "' (IP: {}) is not authorized to publish on topic '" + publish.getTopic() + "' with QoS '" + publish.getQoS().getQosNumber() + "' and retain '" + publish.isRetain() + "'. Disconnecting client.", str2, Mqtt5DisconnectReasonCode.NOT_AUTHORIZED, str2);
        }
    }

    private void publishMessage(final ChannelHandlerContext channelHandlerContext, @NotNull final PUBLISH publish) {
        Futures.addCallback(this.publishService.publish(publish, channelHandlerContext.channel().eventLoop(), ClientConnection.of(channelHandlerContext.channel()).getClientId()), new FutureCallback<PublishReturnCode>() { // from class: com.hivemq.mqtt.handler.publish.IncomingPublishService.1
            public void onSuccess(@Nullable PublishReturnCode publishReturnCode) {
                IncomingPublishService.this.sendAck(channelHandlerContext, publish, publishReturnCode);
            }

            public void onFailure(@NotNull Throwable th) {
                IncomingPublishService.this.sendAck(channelHandlerContext, publish, PublishReturnCode.FAILED);
            }
        }, channelHandlerContext.channel().eventLoop());
    }

    private void sendAck(@NotNull ChannelHandlerContext channelHandlerContext, PUBLISH publish, @Nullable PublishReturnCode publishReturnCode) {
        switch (publish.getQoS()) {
            case AT_MOST_ONCE:
            default:
                return;
            case AT_LEAST_ONCE:
                if (publishReturnCode == PublishReturnCode.NO_MATCHING_SUBSCRIBERS) {
                    channelHandlerContext.pipeline().writeAndFlush(new PUBACK(publish.getPacketIdentifier(), Mqtt5PubAckReasonCode.NO_MATCHING_SUBSCRIBERS, null, Mqtt5UserProperties.NO_USER_PROPERTIES));
                    return;
                } else {
                    channelHandlerContext.pipeline().writeAndFlush(new PUBACK(publish.getPacketIdentifier()));
                    return;
                }
            case EXACTLY_ONCE:
                if (publishReturnCode == PublishReturnCode.NO_MATCHING_SUBSCRIBERS) {
                    channelHandlerContext.pipeline().writeAndFlush(new PUBREC(publish.getPacketIdentifier(), Mqtt5PubRecReasonCode.NO_MATCHING_SUBSCRIBERS, null, Mqtt5UserProperties.NO_USER_PROPERTIES));
                    return;
                } else {
                    channelHandlerContext.pipeline().writeAndFlush(new PUBREC(publish.getPacketIdentifier()));
                    return;
                }
        }
    }

    private boolean isMessageSizeAllowed(@Nullable Long l, @NotNull PUBLISH publish) {
        return l == null || publish.getPayload() == null || l.longValue() >= ((long) publish.getPayload().length);
    }
}
