/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactiveSubscription
implements ReactiveSubscription {
    private final RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commands;
    private final State patternState;
    private final State channelState;

    LettuceReactiveSubscription(RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commands, Function<Throwable, Throwable> exceptionTranslator) {
        this.commands = commands;
        this.patternState = new State(exceptionTranslator);
        this.channelState = new State(exceptionTranslator);
    }

    @Override
    public Mono<Void> subscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, "Channels must not be null!");
        Assert.noNullElements((Object[])channels, "Channels must not contain null elements!");
        return this.channelState.subscribe(channels, arg_0 -> this.commands.subscribe(arg_0));
    }

    @Override
    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, "Patterns must not be null!");
        Assert.noNullElements((Object[])patterns, "Patterns must not contain null elements!");
        return this.patternState.subscribe(patterns, arg_0 -> this.commands.psubscribe(arg_0));
    }

    @Override
    public Mono<Void> unsubscribe() {
        return this.unsubscribe(this.channelState.getTargets().toArray(new ByteBuffer[this.channelState.getTargets().size()]));
    }

    @Override
    public Mono<Void> unsubscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, "Channels must not be null!");
        Assert.noNullElements((Object[])channels, "Channels must not contain null elements!");
        return ObjectUtils.isEmpty(channels) ? Mono.empty() : this.channelState.unsubscribe(channels, arg_0 -> this.commands.unsubscribe(arg_0));
    }

    @Override
    public Mono<Void> pUnsubscribe() {
        return this.pUnsubscribe(this.patternState.getTargets().toArray(new ByteBuffer[this.patternState.getTargets().size()]));
    }

    @Override
    public Mono<Void> pUnsubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, "Patterns must not be null!");
        Assert.noNullElements((Object[])patterns, "Patterns must not contain null elements!");
        return ObjectUtils.isEmpty(patterns) ? Mono.empty() : this.patternState.unsubscribe(patterns, arg_0 -> this.commands.punsubscribe(arg_0));
    }

    @Override
    public Set<ByteBuffer> getChannels() {
        return this.channelState.getTargets();
    }

    @Override
    public Set<ByteBuffer> getPatterns() {
        return this.patternState.getTargets();
    }

    @Override
    public Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>> receive() {
        Flux channelMessages = this.channelState.receive(() -> this.commands.observeChannels().filter(message -> this.channelState.getTargets().contains(message.getChannel())).map(message -> new ReactiveSubscription.ChannelMessage<Object, Object>(message.getChannel(), message.getMessage())));
        Flux patternMessages = this.patternState.receive(() -> this.commands.observePatterns().filter(message -> this.patternState.getTargets().contains(message.getPattern())).map(message -> new ReactiveSubscription.PatternMessage<Object, Object, Object>(message.getPattern(), message.getChannel(), message.getMessage())));
        return channelMessages.mergeWith(patternMessages);
    }

    @Override
    public Mono<Void> cancel() {
        return this.unsubscribe().then(this.pUnsubscribe()).then(Mono.defer(() -> {
            this.channelState.terminate();
            this.patternState.terminate();
            return Mono.empty();
        }));
    }

    static class State {
        private final Set<ByteBuffer> targets = new ConcurrentSkipListSet<ByteBuffer>();
        private final AtomicLong subscribers = new AtomicLong();
        private final AtomicReference<Flux<?>> flux = new AtomicReference();
        private final Function<Throwable, Throwable> exceptionTranslator;
        @Nullable
        private volatile Disposable disposable;

        State(Function<Throwable, Throwable> exceptionTranslator) {
            this.exceptionTranslator = exceptionTranslator;
        }

        Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
            return subscribeFunction.apply(targets).doOnSuccess(discard -> this.targets.addAll(Arrays.asList(targets))).onErrorMap(this.exceptionTranslator);
        }

        Mono<Void> unsubscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> unsubscribeFunction) {
            return Mono.defer(() -> {
                List<ByteBuffer> targetCollection = Arrays.asList(targets);
                return ((Mono)unsubscribeFunction.apply(targets)).doOnSuccess(discard -> this.targets.removeAll(targetCollection)).onErrorMap(this.exceptionTranslator);
            });
        }

        Set<ByteBuffer> getTargets() {
            return Collections.unmodifiableSet(this.targets);
        }

        <T> Flux<T> receive(Supplier<Flux<T>> connectFunction) {
            Flux<?> fastPath = this.flux.get();
            if (fastPath != null) {
                return fastPath;
            }
            ConnectableFlux connectableFlux = connectFunction.get().onErrorMap(this.exceptionTranslator).publish();
            Flux fluxToUse = connectableFlux.doOnSubscribe(subscription -> {
                if (this.subscribers.incrementAndGet() == 1L) {
                    this.disposable = connectableFlux.connect();
                }
            }).doFinally(signalType -> {
                if (this.subscribers.decrementAndGet() == 0L) {
                    this.flux.compareAndSet((Flux<?>)connectableFlux, (Flux<?>)null);
                    this.terminate();
                }
            });
            if (this.flux.compareAndSet(null, fluxToUse)) {
                return fluxToUse;
            }
            return this.flux.get();
        }

        void terminate() {
            this.flux.set(null);
            Disposable disposable = this.disposable;
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    }
}

