/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.listener;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class ReactiveRedisMessageListenerContainer
implements DisposableBean {
    private final RedisSerializationContext.SerializationPair<String> stringSerializationPair = RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.string());
    private final Map<ReactiveSubscription, Subscribers> subscriptions = new ConcurrentHashMap<ReactiveSubscription, Subscribers>();
    @Nullable
    private volatile ReactiveRedisConnection connection;

    public ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory connectionFactory) {
        Assert.notNull((Object)connectionFactory, "ReactiveRedisConnectionFactory must not be null!");
        this.connection = connectionFactory.getReactiveConnection();
    }

    @Override
    public void destroy() {
        this.destroyLater().block();
    }

    public Mono<Void> destroyLater() {
        return Mono.defer(this::doDestroy);
    }

    private Mono<Void> doDestroy() {
        if (this.connection == null) {
            return Mono.empty();
        }
        ReactiveRedisConnection connection = this.connection;
        Flux terminationSignals = null;
        while (!this.subscriptions.isEmpty()) {
            HashMap<ReactiveSubscription, Subscribers> local = new HashMap<ReactiveSubscription, Subscribers>(this.subscriptions);
            List monos = local.keySet().stream().peek(this.subscriptions::remove).map(ReactiveSubscription::cancel).collect(Collectors.toList());
            if (terminationSignals == null) {
                terminationSignals = Flux.concat(monos);
                continue;
            }
            terminationSignals = terminationSignals.mergeWith((Publisher)Flux.concat(monos));
        }
        this.connection = null;
        return terminationSignals != null ? terminationSignals.then(connection.closeLater()) : connection.closeLater();
    }

    public Collection<ReactiveSubscription> getActiveSubscriptions() {
        return this.subscriptions.entrySet().stream().filter(entry -> ((Subscribers)entry.getValue()).hasRegistration()).map(entry -> (ReactiveSubscription)entry.getKey()).collect(Collectors.toList());
    }

    public Flux<ReactiveSubscription.Message<String, String>> receive(ChannelTopic ... channelTopics) {
        Assert.notNull((Object)channelTopics, "ChannelTopics must not be null!");
        Assert.noNullElements((Object[])channelTopics, "ChannelTopics must not contain null elements!");
        return this.receive(Arrays.asList(channelTopics), this.stringSerializationPair, this.stringSerializationPair);
    }

    public Flux<ReactiveSubscription.PatternMessage<String, String, String>> receive(PatternTopic ... patternTopics) {
        Assert.notNull((Object)patternTopics, "PatternTopic must not be null!");
        Assert.noNullElements((Object[])patternTopics, "PatternTopic must not contain null elements!");
        return this.receive(Arrays.asList(patternTopics), this.stringSerializationPair, this.stringSerializationPair).map(m -> (ReactiveSubscription.PatternMessage)m);
    }

    public <C, B> Flux<ReactiveSubscription.Message<C, B>> receive(Iterable<? extends Topic> topics, RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer) {
        Assert.notNull(topics, "Topics must not be null!");
        this.verifyConnection();
        Object[] patterns = this.getTargets(topics, PatternTopic.class);
        Object[] channels = this.getTargets(topics, ChannelTopic.class);
        if (ObjectUtils.isEmpty(patterns) && ObjectUtils.isEmpty(channels)) {
            throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to.");
        }
        return this.doReceive(channelSerializer, messageSerializer, this.connection.pubSubCommands().createSubscription(), (ByteBuffer[])patterns, (ByteBuffer[])channels);
    }

    private <C, B> Flux<ReactiveSubscription.Message<C, B>> doReceive(RedisSerializationContext.SerializationPair<C> channelSerializer, RedisSerializationContext.SerializationPair<B> messageSerializer, Mono<ReactiveSubscription> subscription, ByteBuffer[] patterns, ByteBuffer[] channels) {
        Flux messageStream = subscription.flatMapMany(it -> {
            Mono<Void> subscribe = ReactiveRedisMessageListenerContainer.subscribe(patterns, channels, it);
            MonoProcessor terminalProcessor = MonoProcessor.create();
            return it.receive().mergeWith((Publisher)subscribe.then(Mono.defer(() -> {
                this.getSubscribers((ReactiveSubscription)it).registered();
                return Mono.empty();
            }))).doOnCancel(() -> {
                Subscribers subscribers = this.getSubscribers((ReactiveSubscription)it);
                if (subscribers.unregister()) {
                    this.subscriptions.remove(it);
                    it.unsubscribe().subscribe(v -> terminalProcessor.onComplete(), arg_0 -> ((MonoProcessor)terminalProcessor).onError(arg_0));
                }
            }).mergeWith((Publisher)terminalProcessor);
        });
        return messageStream.map(message -> this.readMessage(channelSerializer.getReader(), messageSerializer.getReader(), (ReactiveSubscription.Message<ByteBuffer, ByteBuffer>)message));
    }

    private static Mono<Void> subscribe(ByteBuffer[] patterns, ByteBuffer[] channels, ReactiveSubscription it) {
        Assert.isTrue(!ObjectUtils.isEmpty(channels) || !ObjectUtils.isEmpty(patterns), "Must provide either channels or patterns!");
        Object subscribe = null;
        if (!ObjectUtils.isEmpty(patterns)) {
            subscribe = it.pSubscribe(patterns);
        }
        if (!ObjectUtils.isEmpty(channels)) {
            Mono<Void> channelsSubscribe = it.subscribe(channels);
            subscribe = subscribe == null ? channelsSubscribe : subscribe.and(channelsSubscribe);
        }
        return subscribe;
    }

    private boolean isActive() {
        return this.connection != null;
    }

    private void verifyConnection() {
        if (!this.isActive()) {
            throw new IllegalStateException("ReactiveRedisMessageListenerContainer is already disposed!");
        }
    }

    private Subscribers getSubscribers(ReactiveSubscription it) {
        return this.subscriptions.computeIfAbsent(it, key -> new Subscribers());
    }

    private ByteBuffer[] getTargets(Iterable<? extends Topic> topics, Class<?> classFilter) {
        return (ByteBuffer[])StreamSupport.stream(topics.spliterator(), false).filter(classFilter::isInstance).map(Topic::getTopic).map(this.stringSerializationPair::write).toArray(ByteBuffer[]::new);
    }

    private <C, B> ReactiveSubscription.Message<C, B> readMessage(RedisElementReader<C> channelSerializer, RedisElementReader<B> messageSerializer, ReactiveSubscription.Message<ByteBuffer, ByteBuffer> message) {
        if (message instanceof ReactiveSubscription.PatternMessage) {
            ReactiveSubscription.PatternMessage patternMessage = (ReactiveSubscription.PatternMessage)message;
            String pattern = ReactiveRedisMessageListenerContainer.read(this.stringSerializationPair.getReader(), (ByteBuffer)patternMessage.getPattern());
            C channel = ReactiveRedisMessageListenerContainer.read(channelSerializer, (ByteBuffer)patternMessage.getChannel());
            B body2 = ReactiveRedisMessageListenerContainer.read(messageSerializer, (ByteBuffer)patternMessage.getMessage());
            return new ReactiveSubscription.PatternMessage<String, C, B>(pattern, channel, body2);
        }
        C channel = ReactiveRedisMessageListenerContainer.read(channelSerializer, message.getChannel());
        B body3 = ReactiveRedisMessageListenerContainer.read(messageSerializer, message.getMessage());
        return new ReactiveSubscription.ChannelMessage<C, B>(channel, body3);
    }

    private static <C> C read(RedisElementReader<C> reader, ByteBuffer buffer) {
        try {
            buffer.mark();
            C c = reader.read(buffer);
            return c;
        }
        finally {
            buffer.reset();
        }
    }

    static class Subscribers {
        private static final AtomicLongFieldUpdater<Subscribers> SUBSCRIBERS = AtomicLongFieldUpdater.newUpdater(Subscribers.class, "subscribers");
        private volatile long subscribers;

        Subscribers() {
        }

        void registered() {
            SUBSCRIBERS.incrementAndGet(this);
        }

        boolean hasRegistration() {
            return SUBSCRIBERS.get(this) > 0L;
        }

        boolean unregister() {
            long value = SUBSCRIBERS.get(this);
            if (value <= 0L) {
                return false;
            }
            return SUBSCRIBERS.compareAndSet(this, value, value - 1L) && value == 1L;
        }
    }
}

