/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import cool.taomu.framework.inter.IBeanContainer;
import cool.taomu.framework.inter.IObservable;
import cool.taomu.framework.inter.IObserver;
import cool.taomu.framework.service.mqtt.broker.Publish;
import cool.taomu.framework.service.mqtt.broker.Retain;
import cool.taomu.framework.service.mqtt.broker.entity.MqttChannelEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttDataEntity;
import cool.taomu.framework.service.mqtt.broker.entity.TopicEntity;
import cool.taomu.framework.service.utils.CommonUtils;
import cool.taomu.framework.spi.annotation.Spi;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.oro.text.perl.Perl5Util;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribeRequest
implements IObserver {
    private final Logger LOG = LoggerFactory.getLogger(SubscribeRequest.class);
    @Spi(value="mqtt_pub_observable", singleton=true)
    private IObservable<IObserver> observable;
    @Spi(value="TmBeanContainer", singleton=true)
    private IBeanContainer beanContainer;
    @Spi(value="mqtt_retain_observable", singleton=true)
    private IObservable<Retain> retainObservable;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        try {
            if (!(mqttMessage instanceof MqttSubscribeMessage)) {
                return null;
            }
            String clientId = CommonUtils.getClientId(ctx.channel());
            if (clientId == null) {
                Thread.sleep(100L);
                clientId = CommonUtils.getClientId(ctx.channel());
            }
            this.LOG.info("\u6267\u884c\u4e86MQTT Subscribe \u547d\u4ee4 : " + clientId);
            try {
                ArrayList<TopicEntity> validTopicList;
                MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage)mqttMessage;
                int messageId = subscribeMessage.variableHeader().messageId();
                ArrayList<TopicEntity> arrayList = validTopicList = this.registerTopics(ctx, subscribeMessage.payload().topicSubscriptions());
                synchronized (arrayList) {
                    if (validTopicList == null || validTopicList.size() == 0) {
                        this.LOG.info(String.format("Valid all subscribe topic failure,messageId:%s", messageId));
                        return null;
                    }
                    MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
                    MqttMessageIdVariableHeader varHeader = MqttMessageIdVariableHeader.from((int)messageId);
                    List<Integer> _topicQos = this.getTopicQos(validTopicList);
                    MqttSubAckPayload _mqttSubAckPayload = new MqttSubAckPayload(_topicQos);
                    MqttSubAckMessage _mqttSubAckMessage = new MqttSubAckMessage(header, varHeader, _mqttSubAckPayload);
                    ctx.writeAndFlush((Object)_mqttSubAckMessage);
                }
            }
            catch (Throwable _t) {
                if (_t instanceof Exception) {
                    Exception ex = (Exception)_t;
                    this.LOG.debug("subscribe requst exception:", (Throwable)ex);
                }
                throw Exceptions.sneakyThrow((Throwable)_t);
            }
            return null;
        }
        catch (Throwable _e) {
            throw Exceptions.sneakyThrow((Throwable)_e);
        }
    }

    public List<Integer> getTopicQos(List<TopicEntity> topics) {
        int _size = topics.size();
        ArrayList<Integer> qoss = new ArrayList<Integer>(_size);
        for (TopicEntity topic : topics) {
            qoss.add(topic.getQos());
        }
        return qoss;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized ArrayList<TopicEntity> registerTopics(ChannelHandlerContext ctx, List<MqttTopicSubscription> topics) {
        ChannelHandlerContext channelHandlerContext = ctx;
        synchronized (channelHandlerContext) {
            String clientId = CommonUtils.getClientId(ctx.channel());
            ArrayList<TopicEntity> topicList = new ArrayList<TopicEntity>();
            for (MqttTopicSubscription subscription : topics) {
                String _picName = subscription.topicName();
                int _value = subscription.qualityOfService().value();
                TopicEntity topic = new TopicEntity(_picName, _value);
                topic.setClientId(clientId);
                this.LOG.info("\u8ba2\u9605Topic {} \u7684\u7528\u6237{}", (Object)topic.getName(), (Object)clientId);
                Perl5Util p5 = new Perl5Util();
                boolean _match = p5.match("/^[A-Za-z0-9]+([\\/A-Za-z0-9_]*|\\/\\+||\\/\\#)$/", topic.getName());
                if (_match) {
                    Object _bean = this.beanContainer.getBean(Publish.class.getName(), new Object[]{topic});
                    IObserver instance = (IObserver)_bean;
                    String _name = topic.getName();
                    this.observable.register((Object)IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{clientId, _name})), (CharSequence)"#"), (Object)instance);
                    this.retainObservable.publish((Object)topic, new Object[0]);
                }
                topicList.add(topic);
            }
            return topicList;
        }
    }

    public void publish(IObservable<?> o, Object arg) {
        boolean _equals;
        if (arg instanceof MqttDataEntity && (_equals = ((MqttDataEntity)arg).getDataType().equals((Object)MqttDataEntity.Type.SUBSCRIBE))) {
            Object _data = ((MqttDataEntity)arg).getData();
            MqttChannelEntity mce = (MqttChannelEntity)_data;
            this.request(mce.getCtx(), mce.getMessage());
        }
    }
}

