/*
 * Decompiled with CFR 0.152.
 */
package cool.oooo.mqtt.callback;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import cool.oooo.mqtt.mqtt.MqttBatchListener;
import cool.oooo.mqtt.mqtt.MqttListener;
import cool.oooo.mqtt.mqtt.MqttSubscription;
import cool.oooo.mqtt.mqtt.MqttSubscriptionOnMethod;
import cool.oooo.mqtt.mqtt.RegularMqttListener;
import cool.oooo.mqtt.mqtt.RegularMqttSubscription;
import cool.oooo.mqtt.mqtt.RegularMqttSubscriptionOnMethod;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Resource;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttCallBackImpl
implements MqttCallback {
    private static final Logger log = LoggerFactory.getLogger(MqttCallBackImpl.class);
    public static final String KEY_VALUE_SEPARATOR = ":";
    @Resource
    private Map<String, MqttListener> consumers;
    @Resource
    private Map<String, MqttBatchListener> batchConsumers;
    @Resource
    private Map<String, RegularMqttListener> regularConsumers;

    public void connectionLost(Throwable throwable) {
        log.error("Mqtt \u8fde\u63a5\u65ad\u5f00", throwable);
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        try {
            this.consumers.values().forEach(consumer -> {
                MqttSubscription mqttSubscription = consumer.getClass().getAnnotation(MqttSubscription.class);
                if (mqttSubscription != null && Objects.equals(topic, mqttSubscription.topic()) && this.containsExpression(mqttMessage, mqttSubscription.expression())) {
                    consumer.consume(mqttMessage);
                }
            });
            this.regularConsumers.values().forEach(consumer -> {
                RegularMqttSubscription regularMqttSubscription = consumer.getClass().getAnnotation(RegularMqttSubscription.class);
                if (regularMqttSubscription != null && this.regularTopicMatch(regularMqttSubscription.topic(), topic) && this.containsExpression(mqttMessage, regularMqttSubscription.expression())) {
                    consumer.consume(topic, mqttMessage);
                }
            });
            this.batchConsumers.values().forEach(consumer -> {
                Method[] methods;
                for (Method method : methods = consumer.getClass().getMethods()) {
                    MqttSubscriptionOnMethod mqttSubscriptionOnMethod = method.getAnnotation(MqttSubscriptionOnMethod.class);
                    if (mqttSubscriptionOnMethod != null && Objects.equals(topic, mqttSubscriptionOnMethod.topic()) && this.containsExpression(mqttMessage, mqttSubscriptionOnMethod.expression())) {
                        this.consumeMsg(mqttMessage, (MqttBatchListener)consumer, method);
                        continue;
                    }
                    RegularMqttSubscriptionOnMethod regularMqttSubscriptionOnMethod = method.getAnnotation(RegularMqttSubscriptionOnMethod.class);
                    if (regularMqttSubscriptionOnMethod == null || !this.regularTopicMatch(regularMqttSubscriptionOnMethod.topic(), topic) || !this.containsExpression(mqttMessage, regularMqttSubscriptionOnMethod.expression())) continue;
                    this.consumeMsg(topic, mqttMessage, (MqttBatchListener)consumer, method);
                }
            });
        }
        catch (Exception e) {
            log.error("mqtt\u6d88\u606f\u56de\u8c03\u51fa\u9519", (Throwable)e);
        }
    }

    private void consumeMsg(String topic, MqttMessage mqttMessage, MqttBatchListener consumer, Method method) {
        try {
            method.invoke((Object)consumer, topic, mqttMessage);
        }
        catch (Exception e) {
            log.error("MqttCallBackImpl consumeMsg error methodName={}", (Object)method.getName(), (Object)e);
        }
    }

    private boolean regularTopicMatch(String regularTopic, String topic) {
        String[] topicStr;
        if (regularTopic == null || topic == null) {
            return false;
        }
        String[] regularTopicStr = regularTopic.split("/");
        if (regularTopicStr.length > (topicStr = topic.split("/")).length) {
            return false;
        }
        for (int i = 0; i < regularTopicStr.length; ++i) {
            if (Objects.equals(regularTopicStr[i], "+") || Objects.equals(regularTopicStr[i], topicStr[i])) continue;
            return false;
        }
        return true;
    }

    private boolean containsExpression(MqttMessage mqttMessage, String expression) {
        if (mqttMessage == null) {
            return false;
        }
        String content = new String(mqttMessage.getPayload());
        if ("".equals(expression)) {
            return true;
        }
        if (expression.contains(KEY_VALUE_SEPARATOR)) {
            String[] kv = expression.split(KEY_VALUE_SEPARATOR);
            JSONObject jsonObject = JSON.parseObject((String)content);
            if (Objects.equals(jsonObject.get((Object)kv[0]), kv[1])) {
                return true;
            }
        }
        return content.contains(expression);
    }

    private void consumeMsg(MqttMessage mqttMessage, MqttBatchListener consumer, Method method) {
        try {
            method.invoke((Object)consumer, mqttMessage);
        }
        catch (Exception e) {
            log.error("MqttCallBackImpl consumeMsg error methodName={}", (Object)method.getName(), (Object)e);
        }
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        String[] topics = iMqttDeliveryToken.getTopics();
        if (topics == null) {
            return;
        }
        for (String topic : topics) {
            log.info("topics: " + topic + " clientId: " + iMqttDeliveryToken.getClient().getClientId() + " \u6d88\u606f\u53d1\u5e03\u6210\u529f");
        }
    }
}

