/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.client;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.dromara.mica.mqtt.codec.MqttConnAckMessage;
import org.dromara.mica.mqtt.codec.MqttConnAckVariableHeader;
import org.dromara.mica.mqtt.codec.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.MqttFixedHeader;
import org.dromara.mica.mqtt.codec.MqttMessage;
import org.dromara.mica.mqtt.codec.MqttMessageBuilders;
import org.dromara.mica.mqtt.codec.MqttMessageIdVariableHeader;
import org.dromara.mica.mqtt.codec.MqttMessageType;
import org.dromara.mica.mqtt.codec.MqttPubAckMessage;
import org.dromara.mica.mqtt.codec.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttPublishVariableHeader;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.MqttSubAckMessage;
import org.dromara.mica.mqtt.codec.MqttSubAckPayload;
import org.dromara.mica.mqtt.codec.MqttSubscribeMessage;
import org.dromara.mica.mqtt.codec.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.MqttUnsubAckMessage;
import org.dromara.mica.mqtt.core.client.IMqttClientConnectListener;
import org.dromara.mica.mqtt.core.client.IMqttClientGlobalMessageListener;
import org.dromara.mica.mqtt.core.client.IMqttClientMessageIdGenerator;
import org.dromara.mica.mqtt.core.client.IMqttClientMessageListener;
import org.dromara.mica.mqtt.core.client.IMqttClientProcessor;
import org.dromara.mica.mqtt.core.client.IMqttClientSession;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.dromara.mica.mqtt.core.client.MqttClientSubscription;
import org.dromara.mica.mqtt.core.client.MqttPendingSubscription;
import org.dromara.mica.mqtt.core.client.MqttPendingUnSubscription;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.utils.hutool.CollUtil;
import org.tio.utils.timer.TimerTaskService;

public class DefaultMqttClientProcessor
implements IMqttClientProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
    private final MqttClientCreator mqttClientCreator;
    private final IMqttClientSession clientSession;
    private final IMqttClientConnectListener connectListener;
    private final IMqttClientGlobalMessageListener globalMessageListener;
    private final IMqttClientMessageIdGenerator messageIdGenerator;
    private final TimerTaskService taskService;
    private final ExecutorService executor;

    public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
        this.mqttClientCreator = mqttClientCreator;
        this.clientSession = mqttClientCreator.getClientSession();
        this.connectListener = mqttClientCreator.getConnectListener();
        this.globalMessageListener = mqttClientCreator.getGlobalMessageListener();
        this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
        this.taskService = mqttClientCreator.getTaskService();
        this.executor = mqttClientCreator.getMqttExecutor();
    }

    @Override
    public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) {
        logger.error(ex.getMessage(), ex);
    }

    @Override
    public void processConAck(ChannelContext context, MqttConnAckMessage message) {
        MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
        MqttConnectReasonCode returnCode = connAckVariableHeader.connectReturnCode();
        switch (returnCode) {
            case CONNECTION_ACCEPTED: {
                context.setAccepted(true);
                if (logger.isInfoEnabled()) {
                    Node node = context.getServerNode();
                    logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", new Object[]{context.getId(), node.getIp(), node.getPort()});
                }
                this.publishConnectEvent(context);
                this.reSendSubscription(context);
                break;
            }
            default: {
                String remark = "MqttClient connect error error ReturnCode:" + returnCode;
                Tio.close((ChannelContext)context, (String)remark);
            }
        }
    }

    private void publishConnectEvent(ChannelContext context) {
        if (this.connectListener == null) {
            return;
        }
        this.executor.submit(() -> {
            try {
                this.connectListener.onConnected(context, context.isReconnect());
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        });
    }

    private void reSendSubscription(ChannelContext context) {
        int reSubscribeBatchSize;
        List<MqttClientSubscription> reSubscriptionList;
        Set<MqttTopicSubscription> globalSubscribe = this.mqttClientCreator.getGlobalSubscribe();
        if (globalSubscribe != null && !globalSubscribe.isEmpty()) {
            this.globalReSendSubscription(context, globalSubscribe);
        }
        if ((reSubscriptionList = this.clientSession.getAndCleanSubscription()).isEmpty()) {
            return;
        }
        int subscribedSize = reSubscriptionList.size();
        if (subscribedSize <= (reSubscribeBatchSize = this.mqttClientCreator.getReSubscribeBatchSize())) {
            this.reSendSubscription(context, reSubscriptionList);
        } else {
            List partitionList = CollUtil.partition(reSubscriptionList, (int)reSubscribeBatchSize);
            for (List partition : partitionList) {
                this.reSendSubscription(context, partition);
            }
        }
    }

    private void globalReSendSubscription(ChannelContext context, Set<MqttTopicSubscription> globalReSubscriptionList) {
        int packetId = this.messageIdGenerator.getId();
        MqttSubscribeMessage message = MqttMessageBuilders.subscribe().addSubscriptions(globalReSubscriptionList).messageId(packetId).build();
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.info("MQTT globalReSubscriptionList:{} packetId:{} resubscribing result:{}", new Object[]{globalReSubscriptionList, packetId, result});
    }

    private void reSendSubscription(ChannelContext context, List<MqttClientSubscription> reSubscriptionList) {
        List topicSubscriptionList = reSubscriptionList.stream().map(MqttClientSubscription::toTopicSubscription).collect(Collectors.toList());
        int packetId = this.messageIdGenerator.getId();
        MqttSubscribeMessage message = MqttMessageBuilders.subscribe().addSubscriptions(topicSubscriptionList).messageId(packetId).build();
        MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(reSubscriptionList, message);
        pendingSubscription.startRetransmitTimer(this.taskService, context);
        this.clientSession.addPaddingSubscribe(packetId, pendingSubscription);
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.info("MQTT subscriptionList:{} packetId:{} resubscribing result:{}", new Object[]{reSubscriptionList, packetId, result});
    }

    @Override
    public void processSubAck(ChannelContext context, MqttSubAckMessage message) {
        int packetId = message.variableHeader().messageId();
        logger.debug("MqttClient SubAck packetId:{}", (Object)packetId);
        MqttPendingSubscription paddingSubscribe = this.clientSession.getPaddingSubscribe(packetId);
        if (paddingSubscribe == null) {
            return;
        }
        List<MqttClientSubscription> subscriptionList = paddingSubscribe.getSubscriptionList();
        MqttSubAckPayload subAckPayload = message.payload();
        List reasonCodeList = subAckPayload.reasonCodes();
        if (reasonCodeList.isEmpty()) {
            logger.error("MqttClient subscriptionList:{} subscribe failed reasonCodes is empty packetId:{}", subscriptionList, (Object)packetId);
            return;
        }
        ArrayList<MqttClientSubscription> subscribedList = new ArrayList<MqttClientSubscription>();
        for (int i = 0; i < subscriptionList.size(); ++i) {
            MqttClientSubscription subscription = subscriptionList.get(i);
            String topicFilter = subscription.getTopicFilter();
            Integer reasonCode = (Integer)reasonCodeList.get(i);
            if (reasonCode == null || reasonCode < 0 || reasonCode > 2) {
                logger.error("MqttClient topicFilter:{} subscribe failed reasonCodes:{} packetId:{}", new Object[]{topicFilter, reasonCode, packetId});
                continue;
            }
            subscribedList.add(subscription);
        }
        logger.info("MQTT subscriptionList:{} subscribed successfully packetId:{}", subscribedList, (Object)packetId);
        paddingSubscribe.onSubAckReceived();
        this.clientSession.removePaddingSubscribe(packetId);
        this.clientSession.addSubscriptionList(subscribedList);
        subscribedList.forEach(clientSubscription -> {
            String topicFilter = clientSubscription.getTopicFilter();
            MqttQoS mqttQoS = clientSubscription.getMqttQoS();
            IMqttClientMessageListener subscriptionListener = clientSubscription.getListener();
            this.executor.execute(() -> {
                try {
                    subscriptionListener.onSubscribed(context, topicFilter, mqttQoS, message);
                }
                catch (Throwable e) {
                    logger.error("MQTT topicFilter:{} subscribed onSubscribed event error.", (Object)subscribedList, (Object)e);
                }
            });
        });
    }

    @Override
    public void processPublish(ChannelContext context, MqttPublishMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        String topicName = variableHeader.topicName();
        MqttQoS mqttQoS = mqttFixedHeader.qosLevel();
        int packetId = variableHeader.packetId();
        logger.debug("MqttClient received publish topic:{} qoS:{} packetId:{}", new Object[]{topicName, mqttQoS, packetId});
        switch (mqttFixedHeader.qosLevel()) {
            case QOS0: {
                this.invokeListenerForPublish(context, topicName, message);
                break;
            }
            case QOS1: {
                this.invokeListenerForPublish(context, topicName, message);
                if (packetId == -1) break;
                MqttMessage messageAck = MqttMessageBuilders.pubAck().packetId(packetId).build();
                boolean resultPubAck = Tio.send((ChannelContext)context, (Packet)messageAck);
                logger.debug("Publish - PubAck send topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{topicName, mqttQoS, packetId, resultPubAck});
                break;
            }
            case QOS2: {
                if (packetId == -1) break;
                MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.QOS0, false, 0);
                MqttMessage pubRecMessage = new MqttMessage(fixedHeader, (Object)MqttMessageIdVariableHeader.from((int)packetId));
                MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
                this.clientSession.addPendingQos2Publish(packetId, pendingQos2Publish);
                pendingQos2Publish.startPubRecRetransmitTimer(this.taskService, context);
                boolean resultPubRec = Tio.send((ChannelContext)context, (Packet)pubRecMessage);
                logger.debug("Publish - PubRec send topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{topicName, mqttQoS, packetId, resultPubRec});
                break;
            }
        }
    }

    @Override
    public void processUnSubAck(MqttUnsubAckMessage message) {
        int packetId = message.variableHeader().messageId();
        logger.debug("MqttClient UnSubAck packetId:{}", (Object)packetId);
        MqttPendingUnSubscription pendingUnSubscription = this.clientSession.getPaddingUnSubscribe(packetId);
        if (pendingUnSubscription == null) {
            return;
        }
        List<String> unSubscriptionTopics = pendingUnSubscription.getTopics();
        logger.info("MQTT Topic:{} successfully unSubscribed packetId:{}", unSubscriptionTopics, (Object)packetId);
        pendingUnSubscription.onUnSubAckReceived();
        this.clientSession.removePaddingUnSubscribe(packetId);
        this.clientSession.removeSubscriptions(unSubscriptionTopics);
    }

    @Override
    public void processPubAck(MqttPubAckMessage message) {
        int packetId = message.variableHeader().messageId();
        logger.debug("MqttClient PubAck packetId:{}", (Object)packetId);
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(packetId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            String topicName = pendingPublish.getMessage().variableHeader().topicName();
            logger.info("MQTT Topic:{} successfully PubAck packetId:{}", (Object)topicName, (Object)packetId);
        }
        pendingPublish.onPubAckReceived();
        this.clientSession.removePendingPublish(packetId);
    }

    @Override
    public void processPubRec(ChannelContext context, MqttMessage message) {
        int packetId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        logger.debug("MqttClient PubRec packetId:{}", (Object)packetId);
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(packetId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.QOS1, false, 0);
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)message.variableHeader();
        MqttMessage pubRelMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
        pendingPublish.setPubRelMessage(pubRelMessage);
        pendingPublish.startPubRelRetransmissionTimer(this.taskService, context);
        boolean result = Tio.send((ChannelContext)context, (Packet)pubRelMessage);
        logger.debug("Publish - PubRec send packetId:{} result:{}", (Object)packetId, (Object)result);
    }

    @Override
    public void processPubRel(ChannelContext context, MqttMessage message) {
        int packetId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        logger.debug("MqttClient PubRel packetId:{}", (Object)packetId);
        MqttPendingQos2Publish pendingQos2Publish = this.clientSession.getPendingQos2Publish(packetId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            String topicName = incomingPublish.variableHeader().topicName();
            this.invokeListenerForPublish(context, topicName, incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.clientSession.removePendingQos2Publish(packetId);
        }
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0);
        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from((int)packetId);
        boolean result = Tio.send((ChannelContext)context, (Packet)new MqttMessage(fixedHeader, (Object)variableHeader));
        logger.debug("Publish - PubRel send packetId:{} result:{}", (Object)packetId, (Object)result);
    }

    @Override
    public void processPubComp(MqttMessage message) {
        int packetId = ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(packetId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            String topicName = pendingPublish.getMessage().variableHeader().topicName();
            logger.info("MQTT Topic:{} successfully PubComp", (Object)topicName);
        }
        pendingPublish.onPubCompReceived();
        this.clientSession.removePendingPublish(packetId);
    }

    private void invokeListenerForPublish(ChannelContext context, String topicName, MqttPublishMessage message) {
        List<MqttClientSubscription> subscriptionList;
        byte[] payload = message.payload();
        if (this.globalMessageListener != null) {
            this.executor.submit(() -> {
                try {
                    this.globalMessageListener.onMessage(context, topicName, message, payload);
                }
                catch (Throwable e) {
                    logger.error(e.getMessage(), e);
                }
            });
        }
        if ((subscriptionList = this.clientSession.getMatchedSubscription(topicName)).isEmpty()) {
            if (this.globalMessageListener == null || this.mqttClientCreator.isDebug()) {
                logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", (Object)topicName);
            } else {
                logger.debug("Mqtt message to accept topic:{} subscriptionList is empty.", (Object)topicName);
            }
        } else {
            subscriptionList.forEach(subscription -> {
                IMqttClientMessageListener listener = subscription.getListener();
                this.executor.submit(() -> {
                    try {
                        listener.onMessage(context, topicName, message, payload);
                    }
                    catch (Throwable e) {
                        logger.error(e.getMessage(), e);
                    }
                });
            });
        }
    }
}

