/*
 * Decompiled with CFR 0.152.
 */
package cool.taomu.framework.service.mqtt.broker.impl;

import com.google.common.collect.ArrayListMultimap;
import cool.taomu.framework.cache.KeyValueCache;
import cool.taomu.framework.service.mqtt.broker.entity.MessageEntity;
import cool.taomu.framework.service.mqtt.broker.inter.IPublishObserver;
import cool.taomu.framework.service.utils.CommonUtils;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PublishObservable {
    private Logger LOG = LoggerFactory.getLogger(PublishObservable.class);
    private static final ArrayListMultimap<String, IPublishObserver> observers = ArrayListMultimap.create();
    private static final PublishObservable instance = new PublishObservable();
    private KeyValueCache cache = KeyValueCache.getInstance();

    private PublishObservable() {
    }

    public static synchronized PublishObservable getInstance() {
        return instance;
    }

    public synchronized void register(String key, IPublishObserver arg) {
        this.LOG.info("register :" + key);
        observers.put((Object)key, (Object)arg);
    }

    public synchronized void unregister(String key) {
        this.LOG.info("unregister :" + key);
        observers.removeAll((Object)key);
        int _size = observers.size();
        String _plus = "unregister :" + Integer.valueOf(_size);
        this.LOG.info(_plus);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(String id, IPublishObserver.Type type) {
        ArrayListMultimap<String, IPublishObserver> arrayListMultimap = observers;
        synchronized (arrayListMultimap) {
            boolean _equals_1;
            Serializable _get;
            Set rateins;
            this.LOG.info("observers number : {}", (Object)observers.size());
            Vector<MessageEntity> messages = new Vector<MessageEntity>();
            boolean _equals = type.equals((Object)IPublishObserver.Type.RETAIN);
            if (_equals && (rateins = (Set)((Object)(_get = this.cache.get(CommonUtils.retain("*"))))) != null) {
                this.LOG.info("rateins \u6d88\u606f\u6570 : {}", (Object)rateins.size());
                messages.addAll(rateins);
            }
            if (_equals_1 = type.equals((Object)IPublishObserver.Type.MESSAGE)) {
                Serializable _get_1 = this.cache.get(id);
                messages.add((MessageEntity)_get_1);
            }
            BiConsumer<String, IPublishObserver> _function = (k, v) -> {
                Runnable _function_1 = () -> v.update((List)((Object)SerializationUtils.clone((Serializable)messages)), type);
                CommonUtils.exec(_function_1);
            };
            observers.forEach(_function);
        }
    }
}

