/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.box.netty.mqtt.service;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import cool.taomu.box.crypto.Base64;
import cool.taomu.box.netty.mqtt.entity.ClientSessionEntity;
import cool.taomu.box.netty.mqtt.entity.MessageEntity;
import cool.taomu.box.netty.mqtt.entity.PublishEntity;
import cool.taomu.box.netty.mqtt.entity.TopicEntity;
import cool.taomu.box.netty.mqtt.extend.MqttUtils;
import cool.taomu.box.netty.mqtt.inter.ICache;
import cool.taomu.box.netty.mqtt.service.MqttSubscribeService;
import cool.taomu.box.netty.mqtt.utils.ISerializationUtils;
import cool.taomu.box.task.dataentity.DataStructureEntity;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.function.Consumer;
import javax.inject.Named;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.oro.text.perl.Perl5Util;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttPublishService {
    private static final Logger LOG = LoggerFactory.getLogger(MqttPublishService.class);
    @Inject
    @Named(value="ClientSessionEntity")
    private ICache<Optional<ClientSessionEntity>> cache;
    @Inject
    private ISerializationUtils isu;
    @Inject
    private MqttSubscribeService subscribeService;

    public byte[] toBytes(MessageEntity s) {
        return this.isu.serialize(s);
    }

    public void message(MessageEntity senderMessage) {
        LOG.debug("\u63a8\u9001\u6d88\u606f");
        ArrayList<TopicEntity> subscribes = this.subscribeService.get();
        try {
            byte[] _payload = senderMessage.getPayload();
            byte[] msg = new Base64(_payload).decode();
            Object _deserialize = this.isu.deserialize(msg);
            DataStructureEntity messageEntity = (DataStructureEntity)_deserialize;
            Consumer<TopicEntity> _function = it -> {
                boolean _equals = messageEntity.getClientId().equals("ALL");
                if (_equals) {
                    this.publishMessage((TopicEntity)it, senderMessage);
                } else {
                    boolean _equals_1 = it.getClientId().equals(messageEntity.getClientId());
                    if (_equals_1) {
                        this.publishMessage((TopicEntity)it, senderMessage);
                    }
                }
            };
            IterableExtensions.filterNull(subscribes).forEach(_function);
        }
        catch (Throwable _t) {
            if (_t instanceof Exception) {
                Exception ex = (Exception)_t;
                LOG.info("\u5e8f\u5217\u5316\u9519\u8bef:{}", (Object)ex.getMessage());
            }
            throw Exceptions.sneakyThrow((Throwable)_t);
        }
    }

    public void publishMessage(TopicEntity topic, MessageEntity senderMessage) {
        try {
            Perl5Util p5 = new Perl5Util();
            String subTopicName = topic.getName().replace("/+", "/[a-zA-Z]?[a-zA-Z0-9]+").replace("/#", "/[a-zA-Z]?([a-zA-Z0-9/]*)").replace("/", "\\/");
            boolean _match = p5.match("/" + subTopicName + "/", senderMessage.getTopic());
            if (_match) {
                ClientSessionEntity clientSession;
                LOG.info("\u53d1\u9001\u8005id : {},  Topic : {}", (Object)senderMessage.getSenderId(), (Object)senderMessage.getTopic());
                int minQos = MqttUtils.getQos(senderMessage.getQos(), topic.getQos());
                if (minQos == 2) {
                    MessageEntity cloneMsg = (MessageEntity)SerializationUtils.clone((Serializable)senderMessage);
                    cloneMsg.setSenderChannel(senderMessage.getSenderChannel());
                }
                LOG.info("clientSession is not null {}", (Object)((clientSession = (ClientSessionEntity)this.cache.get(topic.getClientId()).get()) != null ? 1 : 0));
                int _generateMessageId = clientSession.generateMessageId();
                Integer msgId = _generateMessageId;
                LOG.info("\u8ba2\u9605\u8005id : {},  Topic : {}, \u53d1\u9001\u5185\u5bb9\u957f\u5ea6\uff1a {} messageId : {}", new Object[]{topic.getClientId(), topic.getName(), senderMessage.getPayload().length, msgId});
                MqttQoS _valueOf = MqttQoS.valueOf((int)minQos);
                String _name = topic.getName();
                byte[] _payload = senderMessage.getPayload();
                PublishEntity entity = new PublishEntity(_valueOf, _name, msgId, _payload, false);
                clientSession.getCtx().writeAndFlush((Object)this.response(entity));
            }
        }
        catch (Throwable _t) {
            if (_t instanceof Exception) {
                Exception ex = (Exception)_t;
                LOG.debug("publishMessage \u65b9\u6cd5\u51fa\u73b0\u9519\u8bef : ", (Throwable)ex);
            }
            throw Exceptions.sneakyThrow((Throwable)_t);
        }
    }

    private MqttPublishMessage response(PublishEntity entity) {
        boolean _tripleEquals;
        LOG.debug("------<>{}", (Object)entity);
        Boolean _dup = entity.getDup();
        MqttQoS _qos = entity.getQos();
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, _dup.booleanValue(), _qos, false, 0);
        String _topicName = entity.getTopicName();
        Integer _messageId = entity.getMessageId();
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(_topicName, _messageId.intValue());
        ByteBuf heapBuf = null;
        byte[] _payload = entity.getPayload();
        boolean bl = _tripleEquals = _payload == null;
        if (_tripleEquals) {
            heapBuf = Unpooled.EMPTY_BUFFER;
        } else {
            try {
                heapBuf = Unpooled.wrappedBuffer((byte[])entity.getPayload());
            }
            catch (Throwable _t) {
                if (_t instanceof IllegalArgumentException) {
                    IllegalArgumentException e = (IllegalArgumentException)_t;
                    e.printStackTrace();
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
        }
        return new MqttPublishMessage(header, varHeader, heapBuf);
    }
}

