/*
 * Decompiled with CFR 0.152.
 */
package org.noear.redisx.plus;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.noear.redisx.RedisClient;
import redis.clients.jedis.JedisPubSub;

public class RedisBus {
    private final RedisClient client;

    public RedisBus(RedisClient client) {
        this.client = client;
    }

    public void subscribe(final BiConsumer<String, String> subscriber, String ... topics) {
        this.subscribe(new JedisPubSub(){

            @Override
            public void onMessage(String channel, String message) {
                subscriber.accept(channel, message);
            }
        }, topics);
    }

    public void subscribe(JedisPubSub subscriber, String ... topics) {
        this.client.open(s -> s.subscribe(subscriber, topics));
    }

    public CompletableFuture<Thread> subscribeFuture(final BiConsumer<String, String> subscriber, String ... topics) {
        return this.subscribeFuture(new JedisPubSub(){

            @Override
            public void onMessage(String channel, String message) {
                subscriber.accept(channel, message);
            }
        }, topics);
    }

    public CompletableFuture<Thread> subscribeFuture(JedisPubSub subscriber, String ... topics) {
        CompletableFuture<Thread> future = new CompletableFuture<Thread>();
        Thread thread = new Thread(() -> {
            try {
                this.subscribe(subscriber, topics);
                future.complete(Thread.currentThread());
            }
            catch (Throwable e) {
                future.completeExceptionally(e);
            }
        });
        future.whenComplete((r, e) -> {
            if (e instanceof CancellationException) {
                thread.interrupt();
            }
        });
        thread.start();
        return future;
    }

    public void psubscribe(final BiConsumer<String, String> subscriber, String ... patterns) {
        this.psubscribe(new JedisPubSub(){

            @Override
            public void onPMessage(String pattern, String channel, String message) {
                subscriber.accept(channel, message);
            }
        }, patterns);
    }

    public void psubscribe(JedisPubSub subscriber, String ... patterns) {
        this.client.open(s -> s.jedis().psubscribe(subscriber, patterns));
    }

    public CompletableFuture<Thread> psubscribeFuture(final BiConsumer<String, String> subscriber, String ... patterns) {
        return this.psubscribeFuture(new JedisPubSub(){

            @Override
            public void onPMessage(String pattern, String channel, String message) {
                subscriber.accept(channel, message);
            }
        }, patterns);
    }

    public CompletableFuture<Thread> psubscribeFuture(JedisPubSub subscriber, String ... patterns) {
        CompletableFuture<Thread> future = new CompletableFuture<Thread>();
        Thread thread = new Thread(() -> {
            try {
                this.psubscribe(subscriber, patterns);
                future.complete(Thread.currentThread());
            }
            catch (Throwable e) {
                future.completeExceptionally(e);
            }
        });
        future.whenComplete((r, e) -> {
            if (e instanceof CancellationException) {
                thread.interrupt();
            }
        });
        thread.start();
        return future;
    }

    public void publish(String topic, String message) {
        this.client.open(s -> s.publish(topic, message));
    }
}

