/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.cluster;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.CommandListenerWriter;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.ConnectionState;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslConnectionBuilder;
import io.lettuce.core.StatefulRedisConnectionImpl;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterDistributionChannelWriter;
import io.lettuce.core.cluster.ClusterNodeEndpoint;
import io.lettuce.core.cluster.ClusterPubSubConnectionProvider;
import io.lettuce.core.cluster.ClusterPushHandler;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshScheduler;
import io.lettuce.core.cluster.PartitionsConsensus;
import io.lettuce.core.cluster.PooledClusterConnectionProvider;
import io.lettuce.core.cluster.PubSubClusterEndpoint;
import io.lettuce.core.cluster.ReconnectEventListener;
import io.lettuce.core.cluster.RedisClusterURIUtil;
import io.lettuce.core.cluster.RoundRobinSocketAddressSupplier;
import io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl;
import io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.event.TopologyRefreshEvent;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.topology.ClusterTopologyRefresh;
import io.lettuce.core.cluster.topology.NodeConnectionFactory;
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.jfr.EventRecorder;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.protocol.CommandExpiryWriter;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.DefaultEndpoint;
import io.lettuce.core.protocol.PushHandler;
import io.lettuce.core.pubsub.PubSubCommandHandler;
import io.lettuce.core.pubsub.PubSubEndpoint;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnectionImpl;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

public class RedisClusterClient
extends AbstractRedisClient {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClusterClient.class);
    private final ClusterTopologyRefresh refresh;
    private final ClusterTopologyRefreshScheduler topologyRefreshScheduler = new ClusterTopologyRefreshScheduler(this::getClusterClientOptions, this::getPartitions, this::refreshPartitionsAsync, this.getResources());
    private final Iterable<RedisURI> initialUris;
    private volatile Partitions partitions;

    protected RedisClusterClient() {
        super(null);
        this.initialUris = Collections.emptyList();
        this.refresh = this.createTopologyRefresh();
    }

    protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {
        super(clientResources);
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
        this.refresh = this.createTopologyRefresh();
        this.setDefaultTimeout(this.getFirstUri().getTimeout());
        this.setOptions(ClusterClientOptions.create());
    }

    private static void assertSameOptions(Iterable<RedisURI> redisURIs) {
        Boolean ssl = null;
        Boolean startTls = null;
        Boolean verifyPeer = null;
        for (RedisURI redisURI : redisURIs) {
            if (ssl == null) {
                ssl = redisURI.isSsl();
            }
            if (startTls == null) {
                startTls = redisURI.isStartTls();
            }
            if (verifyPeer == null) {
                verifyPeer = redisURI.isVerifyPeer();
            }
            if (ssl.booleanValue() != redisURI.isSsl()) {
                throw new IllegalArgumentException("RedisURI " + redisURI + " SSL is not consistent with the other seed URI SSL settings");
            }
            if (startTls.booleanValue() != redisURI.isStartTls()) {
                throw new IllegalArgumentException("RedisURI " + redisURI + " StartTLS is not consistent with the other seed URI StartTLS settings");
            }
            if (verifyPeer.booleanValue() == redisURI.isVerifyPeer()) continue;
            throw new IllegalArgumentException("RedisURI " + redisURI + " VerifyPeer is not consistent with the other seed URI VerifyPeer settings");
        }
    }

    public static RedisClusterClient create(RedisURI redisURI) {
        RedisClusterClient.assertNotNull(redisURI);
        return RedisClusterClient.create(Collections.singleton(redisURI));
    }

    public static RedisClusterClient create(Iterable<RedisURI> redisURIs) {
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        return new RedisClusterClient(null, redisURIs);
    }

    public static RedisClusterClient create(String uri) {
        LettuceAssert.notEmpty((CharSequence)uri, "URI must not be empty");
        return RedisClusterClient.create(RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
    }

    public static RedisClusterClient create(ClientResources clientResources, RedisURI redisURI) {
        RedisClusterClient.assertNotNull(clientResources);
        RedisClusterClient.assertNotNull(redisURI);
        return RedisClusterClient.create(clientResources, Collections.singleton(redisURI));
    }

    public static RedisClusterClient create(ClientResources clientResources, String uri) {
        RedisClusterClient.assertNotNull(clientResources);
        LettuceAssert.notEmpty((CharSequence)uri, "URI must not be empty");
        return RedisClusterClient.create(clientResources, RedisClusterURIUtil.toRedisURIs(URI.create(uri)));
    }

    public static RedisClusterClient create(ClientResources clientResources, Iterable<RedisURI> redisURIs) {
        RedisClusterClient.assertNotNull(clientResources);
        RedisClusterClient.assertNotEmpty(redisURIs);
        RedisClusterClient.assertSameOptions(redisURIs);
        return new RedisClusterClient(clientResources, redisURIs);
    }

    public void setOptions(ClusterClientOptions clientOptions) {
        super.setOptions(clientOptions);
    }

    public Partitions getPartitions() {
        if (this.partitions == null) {
            RedisClusterClient.get(this.initializePartitions(), e -> new RedisException("Cannot obtain initial Redis Cluster topology", (Throwable)e));
        }
        return this.partitions;
    }

    protected Iterable<RedisURI> getTopologyRefreshSource() {
        Iterable<RedisURI> seed;
        boolean initialSeedNodes;
        boolean bl = initialSeedNodes = !this.useDynamicRefreshSources();
        if (initialSeedNodes || this.partitions == null || this.partitions.isEmpty()) {
            seed = this.initialUris;
        } else {
            ArrayList<RedisURI> uris = new ArrayList<RedisURI>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(this.partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

    public StatefulRedisClusterConnection<String, String> connect() {
        return this.connect(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisClusterConnection<K, V> connect(RedisCodec<K, V> codec) {
        this.assertInitialPartitions();
        return this.getConnection(this.connectClusterAsync(codec));
    }

    public <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectAsync(RedisCodec<K, V> codec) {
        return RedisClusterClient.transformAsyncConnectionException(this.connectClusterAsync(codec), this.getInitialUris());
    }

    public StatefulRedisClusterPubSubConnection<String, String> connectPubSub() {
        return this.connectPubSub(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisClusterPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
        this.assertInitialPartitions();
        return this.getConnection(this.connectClusterPubSubAsync(codec));
    }

    public <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> connectPubSubAsync(RedisCodec<K, V> codec) {
        return RedisClusterClient.transformAsyncConnectionException(this.connectClusterPubSubAsync(codec), this.getInitialUris());
    }

    StatefulRedisConnection<String, String> connectToNode(SocketAddress socketAddress) {
        return this.connectToNode(this.newStringStringCodec(), socketAddress.toString(), null, Mono.just(socketAddress));
    }

    <K, V> StatefulRedisConnection<K, V> connectToNode(RedisCodec<K, V> codec, String nodeId, RedisChannelWriter clusterWriter, Mono<SocketAddress> socketAddressSupplier) {
        return this.getConnection(this.connectToNodeAsync(codec, nodeId, clusterWriter, socketAddressSupplier));
    }

    <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisCodec<K, V> codec, String nodeId, RedisChannelWriter clusterWriter, Mono<SocketAddress> socketAddressSupplier) {
        ClusterNodeEndpoint endpoint;
        RedisClusterClient.assertNotNull(codec);
        RedisClusterClient.assertNotEmpty(this.initialUris);
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        RedisChannelWriter writer = endpoint = new ClusterNodeEndpoint(this.getClusterClientOptions(), this.getResources(), clusterWriter);
        if (CommandExpiryWriter.isSupported(this.getClusterClientOptions())) {
            writer = new CommandExpiryWriter(writer, this.getClusterClientOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        StatefulRedisConnectionImpl connection = this.newStatefulRedisConnection(writer, endpoint, codec, this.getDefaultTimeout());
        ConnectionFuture connectionFuture = this.connectStatefulAsync(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new CommandHandler(this.getClusterClientOptions(), this.getResources(), endpoint));
        return connectionFuture.whenComplete((conn, throwable) -> {
            if (throwable != null) {
                connection.closeAsync();
            }
        });
    }

    protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter, PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisConnectionImpl<K, V>(channelWriter, pushHandler, codec, timeout);
    }

    <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNodeAsync(RedisCodec<K, V> codec, String nodeId, Mono<SocketAddress> socketAddressSupplier) {
        PubSubEndpoint endpoint;
        RedisClusterClient.assertNotNull(codec);
        RedisClusterClient.assertNotEmpty(this.initialUris);
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        logger.debug("connectPubSubToNode(" + nodeId + ")");
        RedisChannelWriter writer = endpoint = new PubSubEndpoint(this.getClusterClientOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getClusterClientOptions())) {
            writer = new CommandExpiryWriter(writer, this.getClusterClientOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl(endpoint, writer, codec, this.getDefaultTimeout());
        ConnectionFuture connectionFuture = this.connectStatefulAsync(connection, endpoint, this.getFirstUri(), socketAddressSupplier, () -> new PubSubCommandHandler(this.getClusterClientOptions(), this.getResources(), codec, endpoint));
        return connectionFuture.whenComplete((conn, throwable) -> {
            if (throwable != null) {
                connection.closeAsync();
            }
        });
    }

    private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectClusterAsync(RedisCodec<K, V> codec) {
        DefaultEndpoint endpoint;
        if (this.partitions == null) {
            return Futures.failed(new IllegalStateException("Partitions not initialized. Initialize via RedisClusterClient.getPartitions()."));
        }
        this.topologyRefreshScheduler.activateTopologyRefreshIfNeeded();
        logger.debug("connectCluster(" + this.initialUris + ")");
        RedisChannelWriter writer = endpoint = new DefaultEndpoint(this.getClusterClientOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getClusterClientOptions())) {
            writer = new CommandExpiryWriter(writer, this.getClusterClientOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(writer, this.getClusterClientOptions(), this.topologyRefreshScheduler);
        PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider(this, clusterWriter, codec, this.topologyRefreshScheduler);
        clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
        StatefulRedisClusterConnectionImpl connection = this.newStatefulRedisClusterConnection(clusterWriter, pooledClusterConnectionProvider, codec, this.getDefaultTimeout());
        connection.setReadFrom(ReadFrom.UPSTREAM);
        connection.setPartitions(this.partitions);
        Supplier<CommandHandler> commandHandlerSupplier = () -> new CommandHandler(this.getClusterClientOptions(), this.getResources(), endpoint);
        Mono<SocketAddress> socketAddressSupplier = this.getSocketAddressSupplier(connection::getPartitions, TopologyComparators::sortByClientCount);
        Mono<StatefulRedisClusterConnectionImpl> connectionMono = Mono.defer(() -> this.connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
        for (int i = 1; i < this.getConnectionAttempts(); ++i) {
            connectionMono = connectionMono.onErrorResume(t -> this.connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
        }
        return connectionMono.doOnNext(c -> connection.registerCloseables(this.closeableResources, clusterWriter, pooledClusterConnectionProvider)).map(it -> it).toFuture();
    }

    protected <V, K> StatefulRedisClusterConnectionImpl<K, V> newStatefulRedisClusterConnection(RedisChannelWriter channelWriter, ClusterPushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisClusterConnectionImpl<K, V>(channelWriter, pushHandler, codec, timeout);
    }

    private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint, StatefulRedisClusterConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionFuture future = this.connectStatefulAsync((T)((Object)connection), endpoint, this.getFirstUri(), socketAddressSupplier, commandHandlerSupplier);
        return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
    }

    private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint, StatefulRedisConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionFuture future = this.connectStatefulAsync(connection, endpoint, this.getFirstUri(), socketAddressSupplier, commandHandlerSupplier);
        return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
    }

    private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> connectClusterPubSubAsync(RedisCodec<K, V> codec) {
        PubSubClusterEndpoint endpoint;
        if (this.partitions == null) {
            return Futures.failed(new IllegalStateException("Partitions not initialized. Initialize via RedisClusterClient.getPartitions()."));
        }
        this.topologyRefreshScheduler.activateTopologyRefreshIfNeeded();
        logger.debug("connectClusterPubSub(" + this.initialUris + ")");
        RedisChannelWriter writer = endpoint = new PubSubClusterEndpoint(this.getClusterClientOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getClusterClientOptions())) {
            writer = new CommandExpiryWriter(writer, this.getClusterClientOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(writer, this.getClusterClientOptions(), this.topologyRefreshScheduler);
        ClusterPubSubConnectionProvider pooledClusterConnectionProvider = new ClusterPubSubConnectionProvider(this, clusterWriter, codec, endpoint.getUpstreamListener(), this.topologyRefreshScheduler);
        StatefulRedisClusterPubSubConnectionImpl connection = new StatefulRedisClusterPubSubConnectionImpl(endpoint, pooledClusterConnectionProvider, clusterWriter, codec, this.getDefaultTimeout());
        clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
        connection.setPartitions(this.partitions);
        Supplier<CommandHandler> commandHandlerSupplier = () -> new PubSubCommandHandler(this.getClusterClientOptions(), this.getResources(), codec, endpoint);
        Mono<SocketAddress> socketAddressSupplier = this.getSocketAddressSupplier(connection::getPartitions, TopologyComparators::sortByClientCount);
        Mono<StatefulRedisClusterPubSubConnectionImpl> connectionMono = Mono.defer(() -> this.connect(socketAddressSupplier, (DefaultEndpoint)endpoint, connection, commandHandlerSupplier));
        for (int i = 1; i < this.getConnectionAttempts(); ++i) {
            connectionMono = connectionMono.onErrorResume(t -> this.connect(socketAddressSupplier, (DefaultEndpoint)endpoint, connection, commandHandlerSupplier));
        }
        return connectionMono.doOnNext(c -> connection.registerCloseables(this.closeableResources, clusterWriter, pooledClusterConnectionProvider)).map(it -> it).toFuture();
    }

    private int getConnectionAttempts() {
        return Math.max(1, this.partitions.size());
    }

    private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder = this.createConnectionBuilder(connection, connection.getConnectionState(), endpoint, connectionSettings, socketAddressSupplier, commandHandlerSupplier);
        ConnectionFuture future = this.initializeChannelAsync(connectionBuilder);
        return future.thenApply(channelHandler -> connection);
    }

    private <K, V, T extends StatefulRedisConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder = this.createConnectionBuilder(connection, connection.getConnectionState(), endpoint, connectionSettings, socketAddressSupplier, commandHandlerSupplier);
        ConnectionFuture future = this.initializeChannelAsync(connectionBuilder);
        return future.thenApply(channelHandler -> connection);
    }

    private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K, V> connection, ConnectionState state, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder;
        if (connectionSettings.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(connectionSettings);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        state.apply(connectionSettings);
        connectionBuilder.connectionInitializer(this.createHandshake(state));
        connectionBuilder.reconnectionListener(new ReconnectEventListener(this.topologyRefreshScheduler));
        connectionBuilder.clientOptions(this.getClusterClientOptions());
        connectionBuilder.connection(connection);
        connectionBuilder.clientResources(this.getResources());
        connectionBuilder.endpoint(endpoint);
        connectionBuilder.commandHandler(commandHandlerSupplier);
        this.connectionBuilder(socketAddressSupplier, connectionBuilder, connectionSettings);
        return connectionBuilder;
    }

    @Deprecated
    public void reloadPartitions() {
        this.refreshPartitions();
    }

    public void refreshPartitions() {
        RedisClusterClient.get(this.refreshPartitionsAsync().toCompletableFuture(), e -> new RedisException("Cannot reload Redis Cluster topology", (Throwable)e));
    }

    public CompletionStage<Void> refreshPartitionsAsync() {
        ArrayList<RedisURI> sources = new ArrayList<RedisURI>();
        Iterable<RedisURI> topologyRefreshSource = this.getTopologyRefreshSource();
        for (RedisURI redisURI : topologyRefreshSource) {
            sources.add(redisURI);
        }
        EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));
        if (this.partitions == null) {
            return ((CompletableFuture)this.initializePartitions().thenAccept(Partitions::updateCache)).whenComplete((unused, throwable) -> event.record());
        }
        return ((CompletableFuture)this.loadPartitionsAsync().thenAccept(loadedPartitions -> {
            if (TopologyComparators.isChanged(this.getPartitions(), loadedPartitions)) {
                logger.debug("Using a new cluster topology");
                ArrayList<RedisClusterNode> before = new ArrayList<RedisClusterNode>(this.getPartitions());
                ArrayList<RedisClusterNode> after = new ArrayList<RedisClusterNode>((Collection<RedisClusterNode>)loadedPartitions);
                this.getResources().eventBus().publish(new ClusterTopologyChangedEvent(before, after));
            }
            this.partitions.reload(loadedPartitions.getPartitions());
            this.updatePartitionsInConnections();
        })).whenComplete((unused, throwable) -> event.record());
    }

    protected void updatePartitionsInConnections() {
        this.forEachClusterConnection(input -> input.setPartitions(this.partitions));
        this.forEachClusterPubSubConnection(input -> input.setPartitions(this.partitions));
    }

    protected CompletableFuture<Partitions> initializePartitions() {
        return this.loadPartitionsAsync().thenApply(it -> {
            this.partitions = it;
            return this.partitions;
        });
    }

    private void assertInitialPartitions() {
        if (this.partitions == null) {
            RedisClusterClient.get(this.initializePartitions(), e -> new RedisConnectionException("Unable to establish a connection to Redis Cluster", (Throwable)e));
        }
    }

    protected Partitions loadPartitions() {
        return RedisClusterClient.get(this.loadPartitionsAsync(), Function.identity());
    }

    private static <T> T get(CompletableFuture<T> future, Function<RedisException, RedisException> mapper) {
        try {
            return future.get();
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof RedisException) {
                throw mapper.apply((RedisException)e.getCause());
            }
            throw Exceptions.bubble(e);
        }
        catch (Exception e) {
            throw Exceptions.bubble(e);
        }
    }

    protected CompletableFuture<Partitions> loadPartitionsAsync() {
        Iterable<RedisURI> topologyRefreshSource = this.getTopologyRefreshSource();
        CompletableFuture<Partitions> future = new CompletableFuture<Partitions>();
        this.fetchPartitions(topologyRefreshSource).whenComplete((nodes, throwable) -> {
            if (throwable == null) {
                future.complete((Partitions)nodes);
                return;
            }
            if (this.useDynamicRefreshSources() && topologyRefreshSource != this.initialUris) {
                this.fetchPartitions(this.initialUris).whenComplete((nextNodes, nextThrowable) -> {
                    if (nextThrowable != null) {
                        Throwable exception = Exceptions.unwrap(nextThrowable);
                        exception.addSuppressed(Exceptions.unwrap(throwable));
                        future.completeExceptionally(exception);
                    } else {
                        future.complete((Partitions)nextNodes);
                    }
                });
            } else {
                future.completeExceptionally(Exceptions.unwrap(throwable));
            }
        });
        Predicate<RedisClusterNode> nodeFilter = this.getClusterClientOptions().getNodeFilter();
        if (nodeFilter != ClusterClientOptions.DEFAULT_NODE_FILTER) {
            return future.thenApply(partitions -> {
                ArrayList<RedisClusterNode> toRemove = new ArrayList<RedisClusterNode>();
                for (RedisClusterNode partition : partitions) {
                    if (nodeFilter.test(partition)) continue;
                    toRemove.add(partition);
                }
                partitions.removeAll(toRemove);
                return partitions;
            });
        }
        return future;
    }

    private CompletionStage<Partitions> fetchPartitions(Iterable<RedisURI> topologyRefreshSource) {
        CompletionStage<Map<RedisURI, Partitions>> topology = this.refresh.loadViews(topologyRefreshSource, this.getClusterClientOptions().getSocketOptions().getConnectTimeout(), this.useDynamicRefreshSources());
        return topology.thenApply(partitions -> {
            if (partitions.isEmpty()) {
                throw new RedisException(String.format("Cannot retrieve initial cluster partitions from initial URIs %s", topologyRefreshSource));
            }
            Partitions loadedPartitions = this.determinePartitions(this.partitions, (Map<RedisURI, Partitions>)partitions);
            RedisURI viewedBy = RedisClusterClient.getViewedBy(partitions, loadedPartitions);
            for (RedisClusterNode partition : loadedPartitions) {
                if (viewedBy == null) continue;
                RedisURI uri = partition.getUri();
                RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri);
            }
            this.topologyRefreshScheduler.activateTopologyRefreshIfNeeded();
            return loadedPartitions;
        });
    }

    protected Partitions determinePartitions(Partitions current, Map<RedisURI, Partitions> topologyViews) {
        if (current == null) {
            return PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, topologyViews);
        }
        return PartitionsConsensus.KNOWN_MAJORITY.getPartitions(current, topologyViews);
    }

    public void setPartitions(Partitions partitions) {
        this.partitions = partitions;
    }

    @Override
    public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {
        this.topologyRefreshScheduler.shutdown();
        return super.shutdownAsync(quietPeriod, timeout, timeUnit);
    }

    protected RedisURI getFirstUri() {
        RedisClusterClient.assertNotEmpty(this.initialUris);
        Iterator<RedisURI> iterator = this.initialUris.iterator();
        return iterator.next();
    }

    protected Mono<SocketAddress> getSocketAddressSupplier(Supplier<Partitions> partitionsSupplier, Function<Partitions, Collection<RedisClusterNode>> sortFunction) {
        LettuceAssert.notNull(sortFunction, "Sort function must not be null");
        RoundRobinSocketAddressSupplier socketAddressSupplier = new RoundRobinSocketAddressSupplier(partitionsSupplier, sortFunction, this.getResources());
        return Mono.defer(() -> {
            if (this.partitions.isEmpty()) {
                return Mono.fromCallable(() -> {
                    SocketAddress socketAddress = this.getResources().socketAddressResolver().resolve(this.getFirstUri());
                    logger.debug("Resolved SocketAddress {} using {}", (Object)socketAddress, (Object)this.getFirstUri());
                    return socketAddress;
                });
            }
            return Mono.fromCallable(socketAddressSupplier::get);
        });
    }

    protected Iterable<RedisURI> getInitialUris() {
        return this.initialUris;
    }

    protected void forEachClusterConnection(Consumer<StatefulRedisClusterConnectionImpl<?, ?>> function) {
        this.forEachCloseable(input -> input instanceof StatefulRedisClusterConnectionImpl, function);
    }

    protected void forEachClusterPubSubConnection(Consumer<StatefulRedisClusterPubSubConnectionImpl<?, ?>> function) {
        this.forEachCloseable(input -> input instanceof StatefulRedisClusterPubSubConnectionImpl, function);
    }

    protected <T extends Closeable> void forEachCloseable(Predicate<? super Closeable> selector, Consumer<T> function) {
        for (Closeable c : this.closeableResources) {
            if (!selector.test(c)) continue;
            function.accept(c);
        }
    }

    protected ClusterTopologyRefresh createTopologyRefresh() {
        return ClusterTopologyRefresh.create(new NodeConnectionFactoryImpl(), this.getResources());
    }

    protected boolean useDynamicRefreshSources() {
        ClusterTopologyRefreshOptions topologyRefreshOptions = this.getClusterClientOptions().getTopologyRefreshOptions();
        return topologyRefreshOptions.useDynamicRefreshSources();
    }

    protected RedisCodec<String, String> newStringStringCodec() {
        return StringCodec.UTF8;
    }

    private static RedisURI getViewedBy(Map<RedisURI, Partitions> map, Partitions partitions) {
        for (Map.Entry<RedisURI, Partitions> entry : map.entrySet()) {
            if (entry.getValue() != partitions) continue;
            return entry.getKey();
        }
        return null;
    }

    ClusterClientOptions getClusterClientOptions() {
        return (ClusterClientOptions)this.getOptions();
    }

    protected static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future, Iterable<RedisURI> target) {
        return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> {
            if (e != null) {
                return Futures.failed(RedisConnectionException.create(target.toString(), e));
            }
            return CompletableFuture.completedFuture(v);
        }).toCompletableFuture();
    }

    private static <K, V> void assertNotNull(RedisCodec<K, V> codec) {
        LettuceAssert.notNull(codec, "RedisCodec must not be null");
    }

    private static void assertNotEmpty(Iterable<RedisURI> redisURIs) {
        LettuceAssert.notNull(redisURIs, "RedisURIs must not be null");
        LettuceAssert.isTrue(redisURIs.iterator().hasNext(), "RedisURIs must not be empty");
    }

    private static RedisURI assertNotNull(RedisURI redisURI) {
        LettuceAssert.notNull((Object)redisURI, "RedisURI must not be null");
        return redisURI;
    }

    private static void assertNotNull(ClientResources clientResources) {
        LettuceAssert.notNull((Object)clientResources, "ClientResources must not be null");
    }

    private class NodeConnectionFactoryImpl
    implements NodeConnectionFactory {
        private NodeConnectionFactoryImpl() {
        }

        @Override
        public <K, V> StatefulRedisConnection<K, V> connectToNode(RedisCodec<K, V> codec, SocketAddress socketAddress) {
            return RedisClusterClient.this.connectToNode(codec, socketAddress.toString(), null, Mono.just(socketAddress));
        }

        @Override
        public <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisCodec<K, V> codec, SocketAddress socketAddress) {
            return RedisClusterClient.this.connectToNodeAsync(codec, socketAddress.toString(), null, Mono.just(socketAddress));
        }
    }
}

