/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.mqtt.broker.impl;

import cool.taomu.mqtt.broker.entity.ClientSessionEntity;
import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.entity.PublishEntity;
import cool.taomu.mqtt.broker.entity.TopicEntity;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.mqtt.broker.utils.inter.IObservable;
import cool.taomu.mqtt.broker.utils.inter.IObserver;
import cool.taomu.storage.inter.IStorage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.oro.text.perl.Perl5Util;
import org.eclipse.xtend.lib.annotations.Accessors;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Accessors
public class Retain
implements IObserver {
    private final Logger LOG = LoggerFactory.getLogger(Retain.class);
    private IStorage cache = new DataStorage();
    private MessageEntity msg;
    private static AtomicInteger count = new AtomicInteger(0);
    private int number = 0;

    public Retain(MessageEntity msg) {
        this.msg = msg;
    }

    public Retain() {
        count.incrementAndGet();
        this.number = count.intValue();
    }

    private void publishMessage(TopicEntity topic) {
        try {
            Perl5Util p5 = new Perl5Util();
            String subTopicName = topic.getName().replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#", "/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/");
            boolean _match = p5.match("/" + subTopicName + "/", this.msg.getTopic());
            if (_match) {
                this.LOG.info("\u53d1\u9001\u8005id : {},  Topic : {}", (Object)this.msg.getSenderId(), (Object)this.msg.getTopic());
                int minQos = MqttUtils.getQos(this.msg.getQos(), topic.getQos());
                if (minQos == 2) {
                    MessageEntity cloneMsg = (MessageEntity)SerializationUtils.clone((Serializable)this.msg);
                    cloneMsg.setSenderChannel(this.msg.getSenderChannel());
                    this.cache.put("mqtt-qos2-message", topic.getClientId(), (Serializable)cloneMsg);
                }
                this.LOG.info("cache is null:{} client id:{}", (Object)(this.cache == null ? 1 : 0), (Object)topic.getClientId());
                Serializable _get = this.cache.get("mqtt-session", topic.getClientId());
                ClientSessionEntity clientSession = (ClientSessionEntity)_get;
                this.LOG.info("\u8ba2\u9605\u8005id : {},  Topic : {}, \u53d1\u9001\u5185\u5bb9\u957f\u5ea6\uff1a {}", (Object)topic.getClientId(), (Object)this.msg.getPayload().length);
                MqttQoS _valueOf = MqttQoS.valueOf((int)minQos);
                String _name = topic.getName();
                int _generateMessageId = clientSession.generateMessageId();
                byte[] _payload = this.msg.getPayload();
                PublishEntity entity = new PublishEntity(_valueOf, _name, _generateMessageId, _payload, false);
                clientSession.getCtx().writeAndFlush((Object)this.response(entity));
            }
        }
        catch (Throwable _t) {
            if (_t instanceof Exception) {
                Exception ex = (Exception)_t;
                this.LOG.debug("publishMessage \u65b9\u6cd5\u51fa\u73b0\u9519\u8bef : ", (Throwable)ex);
            }
            throw Exceptions.sneakyThrow((Throwable)_t);
        }
    }

    private MqttPublishMessage response(PublishEntity entity) {
        boolean _tripleEquals;
        Boolean _dup = entity.getDup();
        MqttQoS _qos = entity.getQos();
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, _dup.booleanValue(), _qos, false, 0);
        String _topicName = entity.getTopicName();
        Integer _messageId = entity.getMessageId();
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(_topicName, _messageId.intValue());
        ByteBuf heapBuf = null;
        byte[] _payload = entity.getPayload();
        boolean bl = _tripleEquals = _payload == null;
        if (_tripleEquals) {
            heapBuf = Unpooled.EMPTY_BUFFER;
        } else {
            try {
                heapBuf = Unpooled.wrappedBuffer((byte[])entity.getPayload());
            }
            catch (Throwable _t) {
                if (_t instanceof IllegalArgumentException) {
                    IllegalArgumentException e = (IllegalArgumentException)_t;
                    e.printStackTrace();
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
        }
        return new MqttPublishMessage(header, varHeader, heapBuf);
    }

    @Override
    public void publish(IObservable<?> o, Object arg) {
        if (arg instanceof TopicEntity) {
            this.publishMessage((TopicEntity)arg);
        }
    }

    @Pure
    public Logger getLOG() {
        return this.LOG;
    }

    @Pure
    public IStorage getCache() {
        return this.cache;
    }

    public void setCache(IStorage cache) {
        this.cache = cache;
    }

    @Pure
    public MessageEntity getMsg() {
        return this.msg;
    }

    public void setMsg(MessageEntity msg) {
        this.msg = msg;
    }

    @Pure
    public int getNumber() {
        return this.number;
    }

    public void setNumber(int number) {
        this.number = number;
    }
}

