/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.mqtt.broker.factory;

import cool.taomu.mqtt.broker.entity.MessageEntity;
import cool.taomu.mqtt.broker.factory.IProcess;
import cool.taomu.mqtt.broker.utils.MqttUtils;
import cool.taomu.mqtt.broker.utils.impl.DataStorage;
import cool.taomu.storage.inter.IStorage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubRecRequest
implements IProcess {
    private static final Logger LOG = LoggerFactory.getLogger(PubRecRequest.class);
    private IStorage cache = new DataStorage();

    @Override
    public void request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        String clientId = MqttUtils.getClientId(ctx.channel());
        LOG.info("\u6267\u884c\u4e86MQTT PubRec \u547d\u4ee4 : " + clientId);
        Object _variableHeader = mqttMessage.variableHeader();
        MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader)_variableHeader;
        int msgId = idVariableHeader.messageId();
        Serializable _get = this.cache.get("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new Object[]{clientId, msgId})), (CharSequence)"#"));
        HashSet qos2 = (HashSet)_get;
        for (Object q : qos2) {
            MessageEntity msg = (MessageEntity)q;
            MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
            MqttMessageIdVariableHeader _from = MqttMessageIdVariableHeader.from((int)msgId);
            MqttMessage _mqttMessage = new MqttMessage(header, (Object)_from);
            msg.getSenderChannel().writeAndFlush((Object)_mqttMessage);
            this.cache.remove("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new Object[]{clientId, msgId})), (CharSequence)"#"));
            msg.setSenderChannel(ctx.channel());
            String senderId = msg.getSenderId();
            msg.setSenderId(clientId);
            this.cache.put("mqtt-qos2-message", IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new Object[]{senderId, msgId})), (CharSequence)"#"), (Serializable)msg);
        }
    }
}

