/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.govern.core.listener;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands;
import java.util.concurrent.CompletableFuture;
import me.ahoo.govern.core.listener.AbstractMessageListenable;
import me.ahoo.govern.core.listener.ChannelTopic;
import me.ahoo.govern.core.listener.PatternTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisClusterMessageListenable
extends AbstractMessageListenable {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterMessageListenable.class);
    private final RedisClusterPubSubListenerAdapter listenerAdapter;
    private final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection;
    private final RedisClusterPubSubAsyncCommands<String, String> pubSubCommands;

    public RedisClusterMessageListenable(StatefulRedisClusterPubSubConnection<String, String> pubSubConnection) {
        this.pubSubConnection = pubSubConnection;
        this.pubSubCommands = pubSubConnection.async();
        this.listenerAdapter = new RedisClusterPubSubListenerAdapter();
        this.pubSubConnection.addListener((RedisClusterPubSubListener)this.listenerAdapter);
    }

    @Override
    protected CompletableFuture<Void> subscribe(ChannelTopic channelTopic) {
        return this.pubSubCommands.subscribe((Object[])new String[]{channelTopic.getTopic()}).toCompletableFuture();
    }

    @Override
    protected CompletableFuture<Void> subscribe(PatternTopic patternTopic) {
        return this.pubSubCommands.psubscribe((Object[])new String[]{patternTopic.getTopic()}).toCompletableFuture();
    }

    @Override
    protected CompletableFuture<Void> unsubscribe(ChannelTopic topic) {
        return this.pubSubCommands.unsubscribe((Object[])new String[]{topic.getTopic()}).toCompletableFuture();
    }

    @Override
    protected CompletableFuture<Void> unsubscribe(PatternTopic topic) {
        return this.pubSubCommands.punsubscribe((Object[])new String[]{topic.getTopic()}).toCompletableFuture();
    }

    @Override
    public void close() throws Exception {
        this.pubSubConnection.close();
    }

    private class RedisClusterPubSubListenerAdapter
    implements RedisClusterPubSubListener<String, String> {
        private RedisClusterPubSubListenerAdapter() {
        }

        public void message(RedisClusterNode node, String channel, String message) {
            if (log.isDebugEnabled()) {
                log.debug("Message received from a channel subscription - RedisNode[{}] | channel[{}] | message[{}].", new Object[]{node.getUri(), channel, message});
            }
            RedisClusterMessageListenable.this.onMessage(channel, message, null);
        }

        public void message(RedisClusterNode node, String pattern, String channel, String message) {
            if (log.isDebugEnabled()) {
                log.debug("Message received from a pattern subscription - RedisNode[{}]  | pattern[{}] | channel[{}] | message[{}].", new Object[]{node.getUri(), pattern, channel, message});
            }
            RedisClusterMessageListenable.this.onMessage(channel, message, pattern);
        }

        public void subscribed(RedisClusterNode node, String channel, long count) {
            if (log.isInfoEnabled()) {
                log.debug("Subscribed to a channel - RedisNode[{}]  | channel[{}] | {}.", new Object[]{node.getUri(), channel, count});
            }
        }

        public void psubscribed(RedisClusterNode node, String pattern, long count) {
            if (log.isInfoEnabled()) {
                log.info("PSubscribed to a pattern - RedisNode[{}]  | pattern[{}] | {}.", new Object[]{node.getUri(), pattern, count});
            }
        }

        public void unsubscribed(RedisClusterNode node, String channel, long count) {
            if (log.isInfoEnabled()) {
                log.info("Unsubscribed from a channel - RedisNode[{}] | channel[{}] | {}.", new Object[]{node.getUri(), channel, count});
            }
        }

        public void punsubscribed(RedisClusterNode node, String pattern, long count) {
            if (log.isInfoEnabled()) {
                log.info("PUnsubscribed from a pattern - RedisNode[{}] | pattern[{}] | {}.", new Object[]{node.getUri(), pattern, count});
            }
        }
    }
}

