/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp.pubsub;

import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.infinispan.Cache;
import org.infinispan.server.resp.SingleNodeRespBaseTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="dist.server.resp.PublishSubscribeTest")
public class PublishSubscribeTest
extends SingleNodeRespBaseTest {
    @DataProvider(name="booleans")
    protected Object[][] booleans() {
        return new Object[][]{{true}, {false}};
    }

    public void testPubSubChannels() throws Exception {
        RedisCommands redis = this.redisConnection.sync();
        Assertions.assertThat((List)redis.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> handOffQueue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"channel-1", "channel-2"});
        this.assertSubscription(handOffQueue, "channel-1", "channel-2");
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels()).hasSize(2)).containsExactlyInAnyOrder((Object[])new String[]{"channel-1", "channel-2"});
        connection.unsubscribe((Object[])new String[]{"channel-1"});
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels()).hasSize(1)).containsExactly((Object[])new String[]{"channel-2"});
        connection.unsubscribe((Object[])new String[]{"channel-2"});
        Assertions.assertThat((List)redis.pubsubChannels()).isEmpty();
    }

    public void testPubSubChannelsFiltering() throws Exception {
        RedisCommands redis = this.redisConnection.sync();
        Assertions.assertThat((List)redis.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> handOffQueue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        this.assertSubscription(handOffQueue, "tx-channel", "kv-channel-1", "kv-channel-2");
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels((Object)"*")).hasSize(3)).containsExactlyInAnyOrder((Object[])new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels((Object)"tx-*")).hasSize(1)).containsExactlyInAnyOrder((Object[])new String[]{"tx-channel"});
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels((Object)"kv-*")).hasSize(2)).containsExactlyInAnyOrder((Object[])new String[]{"kv-channel-1", "kv-channel-2"});
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels((Object)"*-channel*")).hasSize(3)).containsExactlyInAnyOrder((Object[])new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        connection.unsubscribe((Object[])new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
    }

    public void testPubSubMultipleClientsSameChannel() throws Exception {
        RedisCommands redis = this.redisConnection.sync();
        Assertions.assertThat((List)redis.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> client1 = this.createPubSubConnection();
        BlockingQueue<String> queue1 = this.addPubSubListener(client1);
        RedisPubSubCommands<String, String> client2 = this.createPubSubConnection();
        BlockingQueue<String> queue2 = this.addPubSubListener(client2);
        client1.subscribe((Object[])new String[]{"default-channel"});
        client2.subscribe((Object[])new String[]{"default-channel"});
        this.assertSubscription(queue1, "default-channel");
        this.assertSubscription(queue2, "default-channel");
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels()).hasSize(1)).containsExactly((Object[])new String[]{"default-channel"});
        client1.unsubscribe((Object[])new String[]{"default-channel"});
        this.assertUnsubscribe(queue1, "default-channel");
        ((ListAssert)Assertions.assertThat((List)redis.pubsubChannels()).hasSize(1)).containsExactly((Object[])new String[]{"default-channel"});
        client2.unsubscribe((Object[])new String[]{"default-channel"});
        this.assertUnsubscribe(queue2, "default-channel");
        Assertions.assertThat((List)redis.pubsubChannels()).isEmpty();
    }

    public void testDifferentConnectionsCounting() throws Exception {
        RedisCommands redis1 = this.redisConnection.sync();
        RedisCommands redis2 = this.redisConnection.sync();
        Assertions.assertThat((List)redis1.pubsubChannels()).isEmpty();
        Assertions.assertThat((List)redis2.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> queue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"global-channel"});
        this.assertSubscription(queue, "global-channel");
        ((ListAssert)Assertions.assertThat((List)redis1.pubsubChannels()).hasSize(1)).containsExactly((Object[])new String[]{"global-channel"});
        ((ListAssert)Assertions.assertThat((List)redis2.pubsubChannels()).hasSize(1)).containsExactly((Object[])new String[]{"global-channel"});
        connection.unsubscribe((Object[])new String[]{"global-channel"});
        this.assertUnsubscribe(queue, "global-channel");
        Assertions.assertThat((List)redis1.pubsubChannels()).isEmpty();
        Assertions.assertThat((List)redis2.pubsubChannels()).isEmpty();
    }

    @Test(dataProvider="booleans")
    public void testPubSubUnsubscribe(boolean quit) throws InterruptedException {
        int listenersBefore = TestingUtil.getListeners((Cache)this.cache).size();
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> handOffQueue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"channel2", "test"});
        this.assertSubscription(handOffQueue, "channel2", "test");
        Assertions.assertThat((Collection)TestingUtil.getListeners((Cache)this.cache)).hasSize(listenersBefore + 2);
        if (quit) {
            connection.getStatefulConnection().close();
            PublishSubscribeTest.eventually(() -> TestingUtil.getListeners((Cache)this.cache).size() == listenersBefore);
            Assertions.assertThat((Collection)TestingUtil.getListeners((Cache)this.cache)).hasSize(listenersBefore);
            Assertions.assertThat(handOffQueue).isEmpty();
        } else {
            connection.unsubscribe((Object[])new String[0]);
            for (int i = 0; i < 2; ++i) {
                String value = handOffQueue.poll(10L, TimeUnit.SECONDS);
                ((AbstractStringAssert)Assertions.assertThat((String)value).withFailMessage("Didn't receive any notifications", new Object[0])).isNotNull();
                if (value.startsWith("unsubscribed-channel2-") || value.startsWith("unsubscribed-test-") || value.endsWith("0") && value.endsWith("1")) continue;
                Assertions.fail((String)("Notification doesn't match expected, was: " + value));
            }
            Assertions.assertThat((Collection)TestingUtil.getListeners((Cache)this.cache)).hasSize(listenersBefore);
            Assertions.assertThat((String)connection.ping()).isEqualTo("PONG");
        }
    }

    @Test
    public void testPubSub() throws InterruptedException {
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> handOffQueue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"channel2", "test"});
        this.assertSubscription(handOffQueue, "channel2", "test");
        RedisCommands redis = this.redisConnection.sync();
        redis.publish((Object)"channel2", (Object)"boomshakayaka");
        String value = handOffQueue.poll(10L, TimeUnit.SECONDS);
        Assertions.assertThat((String)value).isEqualTo("message-channel2-boomshakayaka");
        connection.subscribe((Object[])new String[]{"channel"});
        value = handOffQueue.poll(10L, TimeUnit.SECONDS);
        Assertions.assertThat((String)value).isEqualTo("subscribed-channel-3");
        connection.unsubscribe((Object[])new String[]{"channel2"});
        connection.unsubscribe((Object[])new String[]{"doesn't-exist"});
        connection.unsubscribe((Object[])new String[]{"channel", "test"});
        int subscriptions = 3;
        for (String channel : new String[]{"channel2", "doesn't-exist", "channel", "test"}) {
            value = handOffQueue.poll(10L, TimeUnit.SECONDS);
            Assertions.assertThat((String)value).isEqualTo("unsubscribed-" + channel + "-" + Math.max(0, --subscriptions));
        }
    }

    @Test
    public void testCountOnlyPatterns() throws Exception {
        RedisCommands redis = this.redisConnection.sync();
        Assertions.assertThat((Long)redis.pubsubNumpat()).isZero();
        RedisPubSubCommands<String, String> connection = this.createPubSubConnection();
        BlockingQueue<String> handOffQueue = this.addPubSubListener(connection);
        connection.subscribe((Object[])new String[]{"channel2", "test"});
        this.assertSubscription(handOffQueue, "channel2", "test");
        Assertions.assertThat((List)redis.pubsubChannels()).containsExactlyInAnyOrder((Object[])new String[]{"channel2", "test"});
        Assertions.assertThat((Long)redis.pubsubNumpat()).isZero();
        connection.unsubscribe((Object[])new String[]{"channel2", "test"});
    }

    protected RedisPubSubCommands<String, String> createPubSubConnection() {
        return this.client.connectPubSub().sync();
    }

    private void assertSubscription(BlockingQueue<String> queue, String ... channels) throws InterruptedException {
        int i = 1;
        for (String channel : channels) {
            String value = queue.poll(10L, TimeUnit.SECONDS);
            Assertions.assertThat((String)value).isEqualTo(String.format("subscribed-%s-%d", channel, i++));
        }
    }

    private void assertUnsubscribe(BlockingQueue<String> queue, String ... channels) throws InterruptedException {
        int i = channels.length;
        for (String channel : channels) {
            String value = queue.poll(10L, TimeUnit.SECONDS);
            Assertions.assertThat((String)value).isEqualTo(String.format("unsubscribed-%s-%d", channel, --i));
        }
    }

    private BlockingQueue<String> addPubSubListener(RedisPubSubCommands<String, String> connection) {
        final LinkedBlockingQueue<String> handOffQueue = new LinkedBlockingQueue<String>();
        connection.getStatefulConnection().addListener((RedisPubSubListener)new RedisPubSubAdapter<String, String>(this){
            final /* synthetic */ PublishSubscribeTest this$0;
            {
                this.this$0 = this$0;
            }

            public void message(String channel, String message) {
                log.tracef("Received message on channel %s of %s", (Object)channel, (Object)message);
                handOffQueue.add("message-" + channel + "-" + message);
            }

            public void subscribed(String channel, long count) {
                log.tracef("Subscribed to %s with %s", (Object)channel, (Object)count);
                handOffQueue.add("subscribed-" + channel + "-" + count);
            }

            public void unsubscribed(String channel, long count) {
                log.tracef("Unsubscribed to %s with %s", (Object)channel, (Object)count);
                handOffQueue.add("unsubscribed-" + channel + "-" + count);
            }
        });
        return handOffQueue;
    }
}

