/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import cool.taomu.framework.inter.IObservable;
import cool.taomu.framework.inter.IObserver;
import cool.taomu.framework.inter.cache.ISetCache;
import cool.taomu.framework.service.mqtt.broker.entity.MessageEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttChannelEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttDataEntity;
import cool.taomu.framework.service.utils.CommonUtils;
import cool.taomu.framework.spi.annotation.Spi;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.Serializable;
import java.util.Collections;
import java.util.Set;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubCompRequest
implements IObserver {
    private final Logger LOG = LoggerFactory.getLogger(PubCompRequest.class);
    @Spi(value="set_cache", singleton=true)
    private ISetCache<String, Serializable> cache;

    public Object request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        String clientId = CommonUtils.getClientId(ctx.channel());
        this.LOG.info("\u6267\u884c\u4e86MQTT PubComp \u547d\u4ee4 : " + clientId);
        int msgId = CommonUtils.getMessageId(mqttMessage);
        String _qos2Message = CommonUtils.qos2Message(clientId);
        Set qos2 = this.cache.sget((Object)IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new Object[]{_qos2Message, msgId})), (CharSequence)"#"));
        for (Serializable q : qos2) {
            MessageEntity msg = (MessageEntity)q;
            MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
            MqttMessageIdVariableHeader _from = MqttMessageIdVariableHeader.from((int)msgId);
            MqttMessage _mqttMessage = new MqttMessage(header, (Object)_from);
            msg.getSenderChannel().writeAndFlush((Object)_mqttMessage);
            String _qos2Message_1 = CommonUtils.qos2Message(clientId);
            this.cache.sremove((Object)IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new Object[]{_qos2Message_1, msgId})), (CharSequence)"#"), (Object)msg);
        }
        return null;
    }

    public void publish(IObservable<?> o, Object arg) {
        boolean _equals;
        if (arg instanceof MqttDataEntity && (_equals = ((MqttDataEntity)arg).getDataType().equals((Object)MqttDataEntity.Type.PUBCOMP))) {
            Object _data = ((MqttDataEntity)arg).getData();
            MqttChannelEntity mce = (MqttChannelEntity)_data;
            this.request(mce.getCtx(), mce.getMessage());
        }
    }
}

