/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import cool.taomu.framework.cache.KeyValueCache;
import cool.taomu.framework.service.mqtt.broker.entity.ClientSessionEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MessageEntity;
import cool.taomu.framework.service.mqtt.broker.entity.PublishEntity;
import cool.taomu.framework.service.mqtt.broker.entity.TopicEntity;
import cool.taomu.framework.service.mqtt.broker.inter.IPublishObserver;
import cool.taomu.framework.service.mqtt.broker.inter.IResponse;
import cool.taomu.framework.service.utils.CommonUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.oro.text.perl.Perl5Util;
import org.eclipse.xtend.lib.annotations.Accessors;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.eclipse.xtext.xbase.lib.Pure;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Accessors
public class Publish
implements IPublishObserver {
    private final Logger LOG = LoggerFactory.getLogger(Publish.class);
    private KeyValueCache cache = KeyValueCache.getInstance();
    private TopicEntity topic;
    private IResponse<PublishEntity> reponse;
    private static AtomicInteger count = new AtomicInteger(0);
    private int number = 0;

    public Publish(TopicEntity topic, IResponse<PublishEntity> reponse) {
        count.incrementAndGet();
        this.number = count.intValue();
        this.topic = topic;
        this.reponse = reponse;
    }

    @Override
    public void update(List<MessageEntity> messages, IPublishObserver.Type type) {
        this.publishMessage(messages, type);
    }

    private void publishMessage(List<MessageEntity> messages, IPublishObserver.Type type) {
        try {
            Consumer<MessageEntity> _function = msg -> {
                Perl5Util p5 = new Perl5Util();
                String subTopicName = this.topic.getName().replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#", "/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/");
                this.LOG.info("\u8ba2\u9605\u8005id : {},  Topic : {}", (Object)this.topic.getClientId(), (Object)subTopicName);
                this.LOG.info("\u53d1\u9001\u8005id : {},  Topic : {}", (Object)msg.getSenderId(), (Object)msg.getTopic());
                boolean _match = p5.match("/" + subTopicName + "/", msg.getTopic());
                if (_match) {
                    this.LOG.info("\u5339\u914d\u5230\u8ba2\u9605 \uff1a {}", (Object)subTopicName);
                    int qos = msg.getQos().value();
                    int minQos = CommonUtils.getQos(qos, this.topic.getQos());
                    if (minQos == 2) {
                        MessageEntity cloneMsg = (MessageEntity)SerializationUtils.clone((Serializable)msg);
                        cloneMsg.setSenderChannel(msg.getSenderChannel());
                        this.cache.store(CommonUtils.qos2Message(this.topic.getClientId()), (Serializable)cloneMsg);
                    }
                    Serializable _get = this.cache.get(CommonUtils.session(this.topic.getClientId()));
                    ClientSessionEntity clientSession = (ClientSessionEntity)_get;
                    Assert.assertNotNull((Object)clientSession);
                    Assert.assertNotNull((Object)this.topic);
                    Assert.assertNotNull((Object)this.topic.getName());
                    Assert.assertNotNull((Object)msg);
                    byte[] _payload = msg.getPayload();
                    String _string = new String(_payload);
                    this.LOG.info("\u8ba2\u9605\u8005id : {},  Topic : {}, \u53d1\u9001 \uff1a {}", new Object[]{this.topic.getClientId(), subTopicName, _string});
                    MqttQoS _valueOf = MqttQoS.valueOf((int)minQos);
                    String _name = this.topic.getName();
                    int _generateMessageId = clientSession.generateMessageId();
                    byte[] _payload_1 = msg.getPayload();
                    PublishEntity entity = new PublishEntity(_valueOf, _name, _generateMessageId, _payload_1, false);
                    clientSession.getCtx().writeAndFlush((Object)this.reponse.response(entity));
                }
            };
            IterableExtensions.filterNull(messages).forEach(_function);
        }
        catch (Throwable _t) {
            if (_t instanceof Exception) {
                Exception ex = (Exception)_t;
                this.LOG.debug("publishMessage \u65b9\u6cd5\u51fa\u73b0\u9519\u8bef : ", (Throwable)ex);
            }
            throw Exceptions.sneakyThrow((Throwable)_t);
        }
    }

    @Pure
    public Logger getLOG() {
        return this.LOG;
    }

    @Pure
    public KeyValueCache getCache() {
        return this.cache;
    }

    public void setCache(KeyValueCache cache) {
        this.cache = cache;
    }

    @Pure
    public TopicEntity getTopic() {
        return this.topic;
    }

    public void setTopic(TopicEntity topic) {
        this.topic = topic;
    }

    @Pure
    public IResponse<PublishEntity> getReponse() {
        return this.reponse;
    }

    public void setReponse(IResponse<PublishEntity> reponse) {
        this.reponse = reponse;
    }

    @Pure
    public int getNumber() {
        return this.number;
    }

    public void setNumber(int number) {
        this.number = number;
    }
}

