/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl.request;

import com.google.gson.Gson;
import cool.taomu.framework.cache.KeyValueCache;
import cool.taomu.framework.configure.ConfigureManage;
import cool.taomu.framework.configure.entity.ClusterEntity;
import cool.taomu.framework.configure.entity.ConfigureEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MessageEntity;
import cool.taomu.framework.service.mqtt.broker.entity.PubAckEntity;
import cool.taomu.framework.service.mqtt.broker.impl.PublishObservable;
import cool.taomu.framework.service.mqtt.broker.impl.response.PubAckResponse;
import cool.taomu.framework.service.mqtt.broker.inter.IPublishObserver;
import cool.taomu.framework.service.mqtt.broker.inter.IRequest;
import cool.taomu.framework.service.rpc.Gateway;
import cool.taomu.framework.service.rpc.Response;
import cool.taomu.framework.service.rpc.TRpcClient;
import cool.taomu.framework.service.utils.CommonUtils;
import cool.taomu.framework.utils.spi.Alias;
import cool.taomu.framework.utils.spi.ServiceLoader;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.thrift.protocol.TProtocol;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="PUBLISH")
public class PublishRequest
implements IRequest {
    private final Logger LOG = LoggerFactory.getLogger(PublishRequest.class);
    private KeyValueCache cache = KeyValueCache.getInstance();
    private final ConfigureEntity config = ConfigureManage.loadConfig();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<MqttMessage> request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        block13: {
            try {
                MqttPublishMessage publishMessage = (MqttPublishMessage)mqttMessage;
                MessageEntity message = new MessageEntity();
                message.setSenderId(CommonUtils.getClientId(ctx.channel()));
                String _senderId = message.getSenderId();
                String _plus = "\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4 : " + _senderId;
                this.LOG.info(_plus);
                message.setQos(publishMessage.fixedHeader().qosLevel());
                message.setTopic(publishMessage.variableHeader().topicName());
                message.setPayload(((MqttPublishMessage)mqttMessage).payload());
                message.setType(mqttMessage.fixedHeader().messageType().value());
                message.setDup(publishMessage.fixedHeader().isDup());
                message.setRetain(publishMessage.fixedHeader().isRetain());
                message.setMsgId(publishMessage.variableHeader().packetId());
                message.setSenderChannel(ctx.channel());
                MqttQoS _qos = message.getQos();
                if (_qos != null) {
                    switch (_qos) {
                        case EXACTLY_ONCE: 
                        case AT_MOST_ONCE: {
                            this.LOG.info(String.format("Qos0 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            break;
                        }
                        case AT_LEAST_ONCE: {
                            this.LOG.info(String.format("Qos1 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            int messageId = message.getMsgId();
                            PubAckEntity entity = new PubAckEntity(messageId);
                            MqttMessage _response = new PubAckResponse().response(entity);
                            List<MqttMessage> list = Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new MqttMessage[]{_response}));
                            return list;
                        }
                        default: {
                            this.LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                            break;
                        }
                    }
                } else {
                    this.LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                }
            }
            catch (Throwable _t) {
                if (_t instanceof Exception) {
                    Exception ex = (Exception)_t;
                    this.LOG.debug("\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4\u51fa\u9519\u4e86 : ", (Throwable)ex);
                    break block13;
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
            finally {
                ReferenceCountUtil.release((Object)mqttMessage.payload());
            }
        }
        return null;
    }

    protected synchronized void retainMessage(final MessageEntity message) {
        byte[] _payload = message.getPayload();
        String _string = new String(_payload);
        this.LOG.debug("clientId \u4e3a {} \u662f\u5426\u5b58\u5728 Retain \u6570\u636e {}, \u63a5\u53d7\u5230\u7684\u6570\u636e\u4e3a {} ", new Object[]{message.getSenderId(), message.isRetain(), _string});
        this.cache.store(message.getSenderId(), (Serializable)message);
        boolean _isRetain = message.isRetain();
        if (_isRetain) {
            MqttQoS qos = message.getQos();
            byte[] payload = message.getPayload();
            if (qos == MqttQoS.AT_MOST_ONCE || payload == null || payload.length == 0) {
                this.LOG.info("\u6e05\u7a7a clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                this.cache.remove(CommonUtils.retain(message.getSenderId()));
            } else {
                this.LOG.info("\u4fdd\u5b58 clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                this.cache.store(CommonUtils.retain(message.getSenderId()), (Serializable)message);
                Runnable _function = () -> {
                    Consumer<ClusterEntity> _function_1 = addr -> {
                        Runnable _function_2 = () -> {
                            String _xifexpression = null;
                            boolean _equals = addr.getHostname().equals("");
                            boolean _not = !_equals;
                            _xifexpression = _not ? addr.getHostname() : "localhost";
                            TRpcClient.client(_xifexpression, addr.getPort(), 600, new TRpcClient((ClusterEntity)addr){
                                final /* synthetic */ ClusterEntity val$addr;
                                {
                                    this.val$addr = clusterEntity;
                                }

                                @Override
                                public <T> void call(TProtocol protocol) {
                                    try {
                                        ServiceLoader client = ServiceLoader.load(Gateway.Iface.class, (Object[])new Object[]{protocol});
                                        Response resp = ((Gateway.Iface)client.get("Client")).refresh(this.val$addr.getHostname());
                                        PublishRequest.this.LOG.info(resp.toString());
                                    }
                                    catch (Throwable _e) {
                                        throw Exceptions.sneakyThrow((Throwable)_e);
                                    }
                                }
                            });
                        };
                        CommonUtils.exec(_function_2);
                    };
                    IterableExtensions.filterNull((Iterable)this.config.getMqtt().getCluster()).forEach(_function_1);
                };
                CommonUtils.exec(_function);
            }
        }
        PublishObservable.getInstance().start(message.getSenderId(), IPublishObserver.Type.MESSAGE);
        Runnable _function_1 = () -> {
            Consumer<ClusterEntity> _function_2 = addr -> {
                Runnable _function_3 = () -> {
                    final ConfigureEntity config = ConfigureManage.loadConfig();
                    Consumer<ClusterEntity> _function_4 = cluster -> {
                        String _xifexpression = null;
                        boolean _equals = cluster.getHostname().equals("");
                        boolean _not = !_equals;
                        _xifexpression = _not ? cluster.getHostname() : "localhost";
                        TRpcClient.client(_xifexpression, cluster.getPort(), 600, new TRpcClient(){

                            @Override
                            public <T> void call(TProtocol protocol) {
                                try {
                                    ServiceLoader client = ServiceLoader.load(Gateway.Iface.class, (Object[])new Object[]{protocol});
                                    Response resp = ((Gateway.Iface)client.get("Client")).publish(config.getMqtt().getHostname(), new Gson().toJson((Object)message));
                                    PublishRequest.this.LOG.info(resp.toString());
                                }
                                catch (Throwable _e) {
                                    throw Exceptions.sneakyThrow((Throwable)_e);
                                }
                            }
                        });
                    };
                    config.getMqtt().getCluster().forEach(_function_4);
                };
                CommonUtils.exec(_function_3);
            };
            IterableExtensions.filterNull((Iterable)this.config.getMqtt().getCluster()).forEach(_function_2);
        };
        CommonUtils.exec(_function_1);
    }
}

