/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.mqtt.broker.factory;

import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.factory.IProcess;
import cool.taomu.mqtt.broker.impl.PublishObservable;
import cool.taomu.mqtt.broker.impl.Retain;
import cool.taomu.mqtt.broker.impl.RetainObservable;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.mqtt.broker.utils.inter.IObservable;
import cool.taomu.mqtt.broker.utils.inter.IObserver;
import cool.taomu.storage.inter.IStorage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.io.Serializable;
import java.util.Collections;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishRequest
implements IProcess {
    private static final Logger LOG = LoggerFactory.getLogger(PublishRequest.class);
    private IStorage cache = new DataStorage();
    private static final IObservable<IObserver> retainObservable = RetainObservable.getInstance();
    private static final IObservable<IObserver> observable = PublishObservable.getInstance();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        block14: {
            try {
                if (!(mqttMessage instanceof MqttPublishMessage)) {
                    return;
                }
                MqttPublishMessage publishMessage = (MqttPublishMessage)mqttMessage;
                MessageEntity message = new MessageEntity();
                message.setSenderId(MqttUtils.getClientId(ctx.channel()));
                String _senderId = message.getSenderId();
                String _plus = "\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4 : " + _senderId;
                LOG.info(_plus);
                MqttQoS qos = publishMessage.fixedHeader().qosLevel();
                message.setQos(qos.ordinal());
                message.setTopic(publishMessage.variableHeader().topicName());
                message.setPayload(((MqttPublishMessage)mqttMessage).payload());
                message.setType(mqttMessage.fixedHeader().messageType().value());
                message.setDup(publishMessage.fixedHeader().isDup());
                message.setRetain(publishMessage.fixedHeader().isRetain());
                message.setMsgId(publishMessage.variableHeader().packetId());
                message.setSenderChannel(ctx.channel());
                if (qos != null) {
                    switch (qos) {
                        case EXACTLY_ONCE: 
                        case AT_MOST_ONCE: {
                            LOG.info(String.format("Qos0 and Qos2 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            this.publishMessage(message);
                            break;
                        }
                        case AT_LEAST_ONCE: {
                            LOG.info(String.format("Qos1 message,clientId=%s", message.getSenderId()));
                            this.retainMessage(message);
                            this.publishMessage(message);
                            MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
                            MqttMessageIdVariableHeader varHeader = MqttMessageIdVariableHeader.from((int)message.getMsgId());
                            MqttPubAckMessage _mqttPubAckMessage = new MqttPubAckMessage(header, varHeader);
                            ctx.writeAndFlush((Object)_mqttPubAckMessage);
                            break;
                        }
                        default: {
                            LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                            break;
                        }
                    }
                } else {
                    LOG.info(String.format("Wrong mqtt message,clientId=%s", message.getSenderId()));
                }
            }
            catch (Throwable _t) {
                if (_t instanceof Exception) {
                    Exception ex = (Exception)_t;
                    LOG.debug("\u6267\u884c\u4e86MQTT Publish \u547d\u4ee4\u51fa\u9519\u4e86 : ", (Throwable)ex);
                    break block14;
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
            finally {
                ReferenceCountUtil.release((Object)mqttMessage.payload());
            }
        }
    }

    public void retainMessage(MessageEntity message) {
        byte[] _payload = message.getPayload();
        String _string = new String(_payload);
        LOG.debug("clientId \u4e3a {} \u662f\u5426\u5b58\u5728 Retain \u6570\u636e {}, \u63a5\u53d7\u5230\u7684\u6570\u636e\u4e3a {} ", new Object[]{message.getSenderId(), message.isRetain(), _string});
        this.cache.put("mqtt-message", message.getSenderId(), (Serializable)message);
        boolean _isRetain = message.isRetain();
        if (_isRetain) {
            int qos = message.getQos();
            byte[] payload = message.getPayload();
            if (qos == MqttQoS.AT_MOST_ONCE.ordinal() || payload == null || payload.length == 0) {
                LOG.info("\u6e05\u7a7a clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                String _senderId = message.getSenderId();
                String _topic = message.getTopic();
                retainObservable.unregister(IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{_senderId, _topic})), (CharSequence)"-"));
            } else {
                LOG.info("\u4fdd\u5b58 clientId \u4e3a {} \u7684Retain\u6570\u636e", (Object)message.getSenderId());
                String _senderId_1 = message.getSenderId();
                String _topic_1 = message.getTopic();
                String _join = IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{_senderId_1, _topic_1})), (CharSequence)"-");
                Retain _retain = new Retain(message);
                retainObservable.register(_join, _retain);
            }
        }
    }

    public void publishMessage(MessageEntity message) {
        LOG.debug("\u63a8\u9001\u6d88\u606f");
        observable.publish(message, new Object[0]);
    }
}

