/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.govern.core.listener;

import com.google.common.base.Preconditions;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import me.ahoo.govern.core.listener.ChannelTopic;
import me.ahoo.govern.core.listener.MessageListenable;
import me.ahoo.govern.core.listener.MessageListener;
import me.ahoo.govern.core.listener.PatternTopic;
import me.ahoo.govern.core.listener.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageListenable
implements MessageListenable {
    private static final Logger log = LoggerFactory.getLogger(AbstractMessageListenable.class);
    private final ConcurrentHashMap<Topic, CopyOnWriteArraySet<MessageListener>> topicMapListener = new ConcurrentHashMap();

    protected AbstractMessageListenable() {
    }

    @Override
    public CompletableFuture<Void> addListener(Topic topic, MessageListener messageListener) {
        AtomicReference<CompletableFuture<Object>> resultFuture = new AtomicReference<CompletableFuture<Object>>(CompletableFuture.completedFuture(null));
        this.topicMapListener.compute(topic, (key, val) -> {
            boolean succeeded;
            CopyOnWriteArraySet<MessageListener> messageListeners = val;
            if (Objects.isNull(messageListeners)) {
                messageListeners = new CopyOnWriteArraySet<MessageListener>();
            }
            if (messageListeners.isEmpty()) {
                resultFuture.set(this.subscribe(topic));
            }
            if (!(succeeded = messageListeners.add(messageListener))) {
                if (log.isInfoEnabled()) {
                    log.info("addListener - topic[{}] | messageListener:[{}] existed - Failure.", (Object)topic, (Object)messageListener);
                }
            } else if (log.isInfoEnabled()) {
                log.info("addListener - topic[{}] | messageListener:[{}] - Success.", (Object)topic, (Object)messageListener);
            }
            return messageListeners;
        });
        return resultFuture.get();
    }

    protected CompletableFuture<Void> subscribe(Topic topic) {
        if (topic instanceof ChannelTopic) {
            return this.subscribe((ChannelTopic)topic);
        }
        if (topic instanceof PatternTopic) {
            return this.subscribe((PatternTopic)topic);
        }
        throw new IllegalArgumentException("wrong topic : " + topic.getClass().getName());
    }

    protected abstract CompletableFuture<Void> subscribe(ChannelTopic var1);

    protected abstract CompletableFuture<Void> subscribe(PatternTopic var1);

    @Override
    public CompletableFuture<Void> removeListener(Topic topic, MessageListener messageListener) {
        AtomicReference<CompletableFuture<Object>> resultFuture = new AtomicReference<CompletableFuture<Object>>(CompletableFuture.completedFuture(null));
        CopyOnWriteArraySet messageListeners = this.topicMapListener.compute(topic, (key, val) -> {
            if (Objects.isNull(val)) {
                if (log.isInfoEnabled()) {
                    log.info("removeListener - topic[{}] not existed - Failure.", (Object)topic);
                }
                return null;
            }
            if (!val.remove(messageListener)) {
                if (log.isInfoEnabled()) {
                    log.info("removeListener - topic[{}] | messageListener:[{}] not existed - Failure.", (Object)topic, (Object)messageListener);
                }
                return val;
            }
            if (val.isEmpty()) {
                resultFuture.set(this.unsubscribe(topic));
            }
            if (log.isInfoEnabled()) {
                log.info("removeListener - topic[{}] | messageListener:[{}] - Success.", (Object)topic, (Object)messageListener);
            }
            return val;
        });
        return resultFuture.get();
    }

    private CompletableFuture<Void> unsubscribe(Topic topic) {
        Preconditions.checkNotNull((Object)topic);
        if (topic instanceof ChannelTopic) {
            return this.unsubscribe((ChannelTopic)topic);
        }
        if (topic instanceof PatternTopic) {
            return this.unsubscribe((PatternTopic)topic);
        }
        throw new IllegalArgumentException("wrong topic : " + topic.getClass().getName());
    }

    protected abstract CompletableFuture<Void> unsubscribe(ChannelTopic var1);

    protected abstract CompletableFuture<Void> unsubscribe(PatternTopic var1);

    protected void onMessage(String channel, String message, @Nullable String pattern) {
        Topic topic = Objects.nonNull(pattern) ? PatternTopic.of(pattern) : ChannelTopic.of(channel);
        CopyOnWriteArraySet<MessageListener> messageListeners = this.topicMapListener.get(topic);
        if (Objects.nonNull(messageListeners) && !messageListeners.isEmpty()) {
            messageListeners.forEach(messageListener -> messageListener.onMessage(topic, channel, message));
            if (log.isInfoEnabled()) {
                log.info("onMessage - topic[{}] - Success.", (Object)topic);
            }
        } else if (log.isInfoEnabled()) {
            log.info("onMessage - topic[{}] messageListener not existed - Failure.", (Object)topic);
        }
    }
}

