/*
 * Decompiled with CFR 0.152.
 */
package org.idevlab.rjc.message;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.idevlab.rjc.Client;
import org.idevlab.rjc.RedisException;
import org.idevlab.rjc.ds.DataSource;
import org.idevlab.rjc.message.MessageListener;
import org.idevlab.rjc.message.PMessageListener;
import org.idevlab.rjc.message.RedisSubscriber;
import org.idevlab.rjc.message.SubscribeListener;
import org.idevlab.rjc.protocol.Protocol;
import org.idevlab.rjc.util.SafeEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisNodeSubscriber
implements RedisSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(RedisNodeSubscriber.class);
    private Client client;
    private DataSource dataSource;
    private Thread responseThread;
    private final Map<String, MessageListener> msgListenerMap = Collections.synchronizedMap(new HashMap());
    private final Map<String, PMessageListener> pmsgListenerMap = Collections.synchronizedMap(new HashMap());
    private final Set<SubscribeListener> subListenerSet = Collections.synchronizedSet(new HashSet());

    public RedisNodeSubscriber() {
    }

    public RedisNodeSubscriber(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void subscribe(String channel, MessageListener listener) {
        this.getClient().subscribe(channel);
        this.msgListenerMap.put(channel, listener);
    }

    public void psubscribe(String pattern, PMessageListener listener) {
        this.getClient().psubscribe(pattern);
        this.pmsgListenerMap.put(pattern, listener);
    }

    public void unsubscribe(String ... channels) {
        this.getClient().unsubscribe(channels);
        for (String channel : channels) {
            this.msgListenerMap.remove(channel);
        }
    }

    public void punsubscribe(String ... patterns) {
        this.getClient().punsubscribe(patterns);
        for (String pattern : patterns) {
            this.pmsgListenerMap.remove(pattern);
        }
    }

    public void addListener(SubscribeListener listener) {
        this.subListenerSet.add(listener);
    }

    public void removeListener(SubscribeListener listener) {
        this.subListenerSet.remove(listener);
    }

    private synchronized Client getClient() {
        if (this.client == null || !this.client.isConnected()) {
            this.client = new Client(this.dataSource.getConnection());
            this.client.setTimeoutInfinite();
        }
        if (this.responseThread == null || this.responseThread.getState() == Thread.State.TERMINATED || this.responseThread.isInterrupted()) {
            this.responseThread = new ResponseThread(this.client);
            this.responseThread.start();
        }
        return this.client;
    }

    public synchronized void close() {
        this.msgListenerMap.clear();
        this.pmsgListenerMap.clear();
        this.subListenerSet.clear();
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.responseThread != null && this.responseThread.getState() != Thread.State.TERMINATED) {
            this.responseThread.interrupt();
        }
        this.responseThread = null;
    }

    private class ResponseThread
    extends Thread {
        private Client _client;

        private ResponseThread(Client client) {
            this._client = client;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            block22: do {
                List<Object> reply;
                try {
                    reply = this._client.getObjectMultiBulkReply();
                }
                catch (RedisException e) {
                    return;
                }
                Object firstObj = reply.get(0);
                if (!(firstObj instanceof byte[])) {
                    throw new RedisException("Unknown message type: " + firstObj);
                }
                Protocol.Keyword keyword = Protocol.Keyword.find((byte[])firstObj);
                if (keyword == null) {
                    throw new RedisException("Unknown pub/sub message: " + this.byteToStr((byte[])firstObj));
                }
                switch (keyword) {
                    case MESSAGE: {
                        String channel = this.byteToStr((byte[])reply.get(1));
                        String message = this.byteToStr((byte[])reply.get(2));
                        MessageListener listener = (MessageListener)RedisNodeSubscriber.this.msgListenerMap.get(channel);
                        if (listener == null) continue block22;
                        listener.onMessage(channel, message);
                        break;
                    }
                    case PMESSAGE: {
                        String pattern = this.byteToStr((byte[])reply.get(1));
                        String channel = this.byteToStr((byte[])reply.get(2));
                        String message = this.byteToStr((byte[])reply.get(3));
                        PMessageListener listener = (PMessageListener)RedisNodeSubscriber.this.pmsgListenerMap.get(pattern);
                        if (listener == null) continue block22;
                        listener.onMessage(pattern, channel, message);
                        break;
                    }
                    case SUBSCRIBE: {
                        String channel = this.byteToStr((byte[])reply.get(1));
                        Long subscribedChannels = (Long)reply.get(2);
                        Set set = RedisNodeSubscriber.this.subListenerSet;
                        synchronized (set) {
                            for (SubscribeListener listener : RedisNodeSubscriber.this.subListenerSet) {
                                listener.onSubscribe(channel, subscribedChannels);
                            }
                            break;
                        }
                    }
                    case UNSUBSCRIBE: {
                        String channel = this.byteToStr((byte[])reply.get(1));
                        Long subscribedChannels = (Long)reply.get(2);
                        Set set = RedisNodeSubscriber.this.subListenerSet;
                        synchronized (set) {
                            for (SubscribeListener listener : RedisNodeSubscriber.this.subListenerSet) {
                                listener.onUnsubscribe(channel, subscribedChannels);
                            }
                            break;
                        }
                    }
                    case PSUBSCRIBE: {
                        String pattern = this.byteToStr((byte[])reply.get(1));
                        Long subscribedChannels = (Long)reply.get(2);
                        Set set = RedisNodeSubscriber.this.subListenerSet;
                        synchronized (set) {
                            for (SubscribeListener listener : RedisNodeSubscriber.this.subListenerSet) {
                                listener.onPSubscribe(pattern, subscribedChannels);
                            }
                            break;
                        }
                    }
                    case PUNSUBSCRIBE: {
                        String pattern = this.byteToStr((byte[])reply.get(1));
                        Long subscribedChannels = (Long)reply.get(2);
                        Set set = RedisNodeSubscriber.this.subListenerSet;
                        synchronized (set) {
                            for (SubscribeListener listener : RedisNodeSubscriber.this.subListenerSet) {
                                listener.onPUnsubscribe(pattern, subscribedChannels);
                            }
                            break;
                        }
                    }
                    default: {
                        LOG.warn("Unknown message: {}", (Object)keyword.toString());
                    }
                }
            } while (!this.isInterrupted());
        }

        private String byteToStr(byte[] bytes) {
            return bytes == null ? null : SafeEncoder.encode(bytes);
        }
    }
}

