/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import cool.taomu.framework.inter.IObservable;
import cool.taomu.framework.inter.IObserver;
import cool.taomu.framework.inter.cache.ICache;
import cool.taomu.framework.service.mqtt.broker.Retain;
import cool.taomu.framework.service.mqtt.broker.entity.MessageEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttChannelEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttDataEntity;
import cool.taomu.framework.service.utils.CommonUtils;
import cool.taomu.framework.spi.annotation.Spi;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.io.Serializable;
import java.util.Collections;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishRequest
implements IObserver {
    private final Logger LOG = LoggerFactory.getLogger(PublishRequest.class);
    @Spi(value="kvCache", singleton=true)
    private ICache<String, Serializable> cache;
    @Spi(value="mqtt_pub_observable", singleton=true)
    private IObservable<IObserver> observable;
    @Spi(value="mqtt_retain_observable", singleton=true)
    private IObservable<Retain> retainObservable;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        block14: {
            try {
                if (!(mqttMessage instanceof MqttPublishMessage)) {
                    Object var3_3 = null;
                    return var3_3;
                }
                MqttPublishMessage publishMessage = (MqttPublishMessage)mqttMessage;
                MessageEntity message = new MessageEntity();
                message.setSenderId(CommonUtils.getClientId(ctx.channel()));
                String _senderId = message.getSenderId();
                String _plus = "\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4 : " + _senderId;
                this.LOG.info(_plus);
                MqttQoS qos = publishMessage.fixedHeader().qosLevel();
                message.setQos(qos.ordinal());
                message.setTopic(publishMessage.variableHeader().topicName());
                message.setPayload(((MqttPublishMessage)mqttMessage).payload());
                message.setType(mqttMessage.fixedHeader().messageType().value());
                message.setDup(publishMessage.fixedHeader().isDup());
                message.setRetain(publishMessage.fixedHeader().isRetain());
                message.setMsgId(publishMessage.variableHeader().packetId());
                message.setSenderChannel(ctx.channel());
                if (qos != null) {
                    switch (qos) {
                        case EXACTLY_ONCE: 
                        case AT_MOST_ONCE: {
                            this.LOG.info(String.format("Qos0 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            break;
                        }
                        case AT_LEAST_ONCE: {
                            this.LOG.info(String.format("Qos1 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
                            MqttMessageIdVariableHeader varHeader = MqttMessageIdVariableHeader.from((int)message.getMsgId());
                            MqttPubAckMessage _mqttPubAckMessage = new MqttPubAckMessage(header, varHeader);
                            ctx.writeAndFlush((Object)_mqttPubAckMessage);
                            break;
                        }
                        default: {
                            this.LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                            break;
                        }
                    }
                } else {
                    this.LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                }
            }
            catch (Throwable _t) {
                if (_t instanceof Exception) {
                    Exception ex = (Exception)_t;
                    this.LOG.debug("\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4\u51fa\u9519\u4e86 : ", (Throwable)ex);
                    break block14;
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
            finally {
                ReferenceCountUtil.release((Object)mqttMessage.payload());
            }
        }
        return null;
    }

    protected synchronized void retainMessage(MessageEntity message) {
        byte[] _payload = message.getPayload();
        String _string = new String(_payload);
        this.LOG.debug("clientId \u4e3a {} \u662f\u5426\u5b58\u5728 Retain \u6570\u636e {}, \u63a5\u53d7\u5230\u7684\u6570\u636e\u4e3a {} ", new Object[]{message.getSenderId(), message.isRetain(), _string});
        this.cache.put((Object)message.getSenderId(), (Object)message);
        boolean _isRetain = message.isRetain();
        if (_isRetain) {
            int qos = message.getQos();
            byte[] payload = message.getPayload();
            if (qos == MqttQoS.AT_MOST_ONCE.ordinal() || payload == null || payload.length == 0) {
                this.LOG.info("\u6e05\u7a7a clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                String _senderId = message.getSenderId();
                String _topic = message.getTopic();
                this.retainObservable.unregister((Object)IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{_senderId, _topic})), (CharSequence)"-"));
            } else {
                this.LOG.info("\u4fdd\u5b58 clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                String _senderId_1 = message.getSenderId();
                String _topic_1 = message.getTopic();
                String _join = IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{_senderId_1, _topic_1})), (CharSequence)"-");
                Retain _retain = new Retain(message);
                this.retainObservable.register((Object)_join, (Object)_retain);
            }
        }
        this.observable.publish((Object)message, new Object[0]);
    }

    public void publish(IObservable<?> o, Object arg) {
        boolean _equals;
        if (arg instanceof MqttDataEntity && (_equals = ((MqttDataEntity)arg).getDataType().equals((Object)MqttDataEntity.Type.PUBLISH))) {
            Object _data = ((MqttDataEntity)arg).getData();
            MqttChannelEntity mce = (MqttChannelEntity)_data;
            this.request(mce.getCtx(), mce.getMessage());
        }
    }
}

