/*
 * Decompiled with CFR 0.152.
 */
package cool.doudou.mqtt.assistant.core.handler;

import cool.doudou.mqtt.assistant.core.ConcurrentMapFactory;
import cool.doudou.mqtt.assistant.core.callback.MqttSubscribeCallback;
import cool.doudou.mqtt.assistant.core.entity.SubscribeMethod;
import cool.doudou.mqtt.assistant.core.properties.MqttProperties;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;

public class MqttMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
    private MqttProperties mqttProperties;

    @Bean
    public MessageProducer mqttInbound(MqttPahoClientFactory mqttPahoClientFactory, MessageChannel mqttInboundChannel, MessageChannel mqttErrorChannel) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(this.mqttProperties.getClientId() + "_in", mqttPahoClientFactory, this.mqttProperties.getTopics());
        adapter.setCompletionTimeout(5000L);
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        defaultPahoMessageConverter.setPayloadAsBytes(true);
        adapter.setConverter((MqttMessageConverter)defaultPahoMessageConverter);
        adapter.setOutputChannel(mqttInboundChannel);
        adapter.setErrorChannel(mqttErrorChannel);
        adapter.setQos(new int[]{this.mqttProperties.getQos()});
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel="mqttInboundChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = null;
            MessageHeaders headers = message.getHeaders();
            if (!headers.isEmpty()) {
                topic = String.valueOf(headers.get((Object)"mqtt_receivedTopic"));
            }
            if (topic == null) {
                log.error("handler: topic is null");
            } else {
                byte[] payload = (byte[])message.getPayload();
                MqttSubscribeCallback callback = ConcurrentMapFactory.getCallback(topic);
                if (callback != null) {
                    callback.messageArrived(topic, payload);
                    return;
                }
                SubscribeMethod subscribeMethod = ConcurrentMapFactory.getMethod(topic);
                if (subscribeMethod != null) {
                    Object bean = subscribeMethod.getBean();
                    Method method = subscribeMethod.getMethod();
                    try {
                        method.setAccessible(true);
                        method.invoke(bean, new Object[]{payload});
                    }
                    catch (Exception e) {
                        log.error("bean[{}].method[{}] invoke exception: ", new Object[]{bean, method.getName(), e});
                    }
                    return;
                }
                log.warn("topic[{}]: No handler or method found", (Object)topic);
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel="mqttErrorChannel")
    public MessageHandler errorHandler() {
        return message -> log.error("errorHandler: {}", (Object)message);
    }

    @Bean
    @ServiceActivator(inputChannel="mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(this.mqttProperties.getClientId() + "_out", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(this.mqttProperties.getTopics()[0]);
        return messageHandler;
    }

    public MqttMessageHandler(MqttProperties mqttProperties) {
        this.mqttProperties = mqttProperties;
    }
}

