/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import cool.taomu.framework.inter.IObservable;
import cool.taomu.framework.inter.IObserver;
import cool.taomu.framework.service.mqtt.broker.entity.MqttChannelEntity;
import cool.taomu.framework.service.mqtt.broker.entity.MqttDataEntity;
import cool.taomu.framework.service.utils.CommonUtils;
import cool.taomu.framework.spi.annotation.Spi;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.eclipse.xtext.xbase.lib.CollectionLiterals;
import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnSubscribeRequest
implements IObserver {
    private final Logger LOG = LoggerFactory.getLogger(UnSubscribeRequest.class);
    @Spi(value="mqtt_pub_observable", singleton=true)
    private IObservable<IObserver> observable;

    public void publish(IObservable<?> o, Object arg) {
        boolean _equals;
        if (arg instanceof MqttDataEntity && (_equals = ((MqttDataEntity)arg).getDataType().equals((Object)MqttDataEntity.Type.UNSUBSCRIBE))) {
            Object _data = ((MqttDataEntity)arg).getData();
            MqttChannelEntity mce = (MqttChannelEntity)_data;
            this.request(mce.getCtx(), mce.getMessage());
        }
    }

    public ChannelFuture request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        ChannelFuture _xblockexpression = null;
        MqttUnsubscribeMessage message = (MqttUnsubscribeMessage)mqttMessage;
        int messageId = message.variableHeader().messageId();
        List topics = message.payload().topics();
        String clientId = CommonUtils.getClientId(ctx.channel());
        Consumer<String> _function = it -> {
            this.LOG.info("unsubscribe clientId {} topics {}", (Object)clientId, it);
            this.observable.unregister((Object)IterableExtensions.join(Collections.unmodifiableList(CollectionLiterals.newArrayList((Object[])new String[]{clientId, it})), (CharSequence)"#"));
        };
        topics.forEach(_function);
        MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdVariableHeader varHeader = MqttMessageIdVariableHeader.from((int)messageId);
        MqttUnsubAckMessage _mqttUnsubAckMessage = new MqttUnsubAckMessage(header, varHeader);
        _xblockexpression = ctx.writeAndFlush((Object)_mqttUnsubAckMessage);
        return _xblockexpression;
    }
}

