/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.FailoverRequestBalancingStrategy;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ServerConfiguration;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.client.hotrod.impl.MarshallerRegistry;
import org.infinispan.client.hotrod.impl.TopologyInfo;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHashFactory;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelInitializer;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelPool;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.SecurityActions;
import org.infinispan.client.hotrod.impl.transport.netty.TransportHelper;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.ProcessorInfo;

@ThreadSafe
public class ChannelFactory {
    public static final String DEFAULT_CLUSTER_NAME = "___DEFAULT-CLUSTER___";
    private static final Log log = LogFactory.getLog(ChannelFactory.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final CompletableFuture<ClusterSwitchStatus> NOT_SWITCHED_FUTURE = CompletableFuture.completedFuture(ClusterSwitchStatus.NOT_SWITCHED);
    private static final CompletableFuture<ClusterSwitchStatus> IN_PROGRESS_FUTURE = CompletableFuture.completedFuture(ClusterSwitchStatus.IN_PROGRESS);
    private static final CompletableFuture<ClusterSwitchStatus> SWITCHED_FUTURE = CompletableFuture.completedFuture(ClusterSwitchStatus.SWITCHED);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentMap<SocketAddress, ChannelPool> channelPoolMap = new ConcurrentHashMap<SocketAddress, ChannelPool>();
    private final Function<SocketAddress, ChannelPool> newPool = this::newPool;
    private EventLoopGroup eventLoopGroup;
    private ExecutorService executorService;
    private Map<WrappedByteArray, FailoverRequestBalancingStrategy> balancers;
    private OperationsFactory operationsFactory;
    private Configuration configuration;
    private Collection<SocketAddress> initialServers;
    private int maxRetries;
    private Marshaller marshaller;
    private Collection<Consumer<Set<SocketAddress>>> failedServerNotifier;
    @GuardedBy(value="lock")
    private volatile TopologyInfo topologyInfo;
    private volatile String currentClusterName;
    private List<ClusterInfo> clusters = new ArrayList<ClusterInfo>();
    private final AtomicInteger topologyAge = new AtomicInteger(0);
    private MarshallerRegistry marshallerRegistry;
    private LongAdder totalRetries = new LongAdder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Codec codec, Configuration configuration, AtomicInteger defaultCacheTopologyId, Marshaller marshaller, ExecutorService executorService, ClientListenerNotifier listenerNotifier, Collection<Consumer<Set<SocketAddress>>> failedServerNotifier, MarshallerRegistry marshallerRegistry) {
        this.marshallerRegistry = marshallerRegistry;
        this.lock.writeLock().lock();
        try {
            this.marshaller = marshaller;
            this.configuration = configuration;
            this.executorService = executorService;
            this.failedServerNotifier = failedServerNotifier;
            int asyncThreads = this.maxAsyncThreads(executorService, configuration);
            int eventLoopThreads = SecurityActions.getIntProperty("io.netty.eventLoopThreads", ProcessorInfo.availableProcessors() * 2);
            int maxExecutors = Math.min(asyncThreads, eventLoopThreads);
            this.eventLoopGroup = TransportHelper.createEventLoopGroup(maxExecutors, executorService);
            ArrayList<InetSocketAddress> servers = new ArrayList<InetSocketAddress>();
            this.initialServers = new ArrayList<SocketAddress>();
            for (ServerConfiguration server : configuration.servers()) {
                servers.add(InetSocketAddress.createUnresolved(server.host(), server.port()));
            }
            this.initialServers.addAll(servers);
            if (!configuration.clusters().isEmpty()) {
                configuration.clusters().forEach(cluster -> {
                    Collection clusterAddresses = cluster.getCluster().stream().map(server -> InetSocketAddress.createUnresolved(server.host(), server.port())).collect(Collectors.toList());
                    ClusterInfo clusterInfo = new ClusterInfo(cluster.getClusterName(), clusterAddresses);
                    log.debugf("Add secondary cluster: %s", clusterInfo);
                    this.clusters.add(clusterInfo);
                });
                this.clusters.add(new ClusterInfo(DEFAULT_CLUSTER_NAME, this.initialServers));
            }
            this.currentClusterName = DEFAULT_CLUSTER_NAME;
            this.topologyInfo = new TopologyInfo(defaultCacheTopologyId, Collections.unmodifiableCollection(servers), configuration);
            this.operationsFactory = new OperationsFactory(this, codec, listenerNotifier, configuration);
            this.maxRetries = configuration.maxRetries();
            if (log.isDebugEnabled()) {
                log.debugf("Statically configured servers: %s", servers);
                log.debugf("Tcp no delay = %b; client socket timeout = %d ms; connect timeout = %d ms", configuration.tcpNoDelay(), configuration.socketTimeout(), configuration.connectionTimeout());
            }
            this.balancers = new HashMap<WrappedByteArray, FailoverRequestBalancingStrategy>();
            WrappedByteArray defaultCacheName = new WrappedByteArray(RemoteCacheManager.cacheNameBytes());
            this.balancers.put(defaultCacheName, this.createBalancer(defaultCacheName));
        }
        finally {
            this.lock.writeLock().unlock();
        }
        this.pingServersIgnoreException();
    }

    private int maxAsyncThreads(ExecutorService executorService, Configuration configuration) {
        if (executorService instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor)executorService).getMaximumPoolSize();
        }
        return new ConfigurationProperties((Properties)configuration.asyncExecutorFactory().properties()).getDefaultExecutorFactoryPoolSize();
    }

    public MarshallerRegistry getMarshallerRegistry() {
        return this.marshallerRegistry;
    }

    private ChannelPool newPool(SocketAddress address) {
        log.debugf("Creating new channel pool for %s", address);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(TransportHelper.socketChannel())).remoteAddress(address).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.configuration.connectionTimeout())).option(ChannelOption.SO_KEEPALIVE, (Object)this.configuration.tcpKeepAlive())).option(ChannelOption.TCP_NODELAY, (Object)this.configuration.tcpNoDelay())).option(ChannelOption.SO_RCVBUF, (Object)1024576);
        int maxConnections = this.configuration.connectionPool().maxActive();
        if (maxConnections < 0) {
            maxConnections = Integer.MAX_VALUE;
        }
        ChannelInitializer channelInitializer = new ChannelInitializer(bootstrap, address, this.operationsFactory, this.configuration, this);
        bootstrap.handler((ChannelHandler)channelInitializer);
        ChannelPool pool = new ChannelPool((EventExecutor)bootstrap.config().group().next(), address, channelInitializer, this.configuration.connectionPool().exhaustedAction(), this.configuration.connectionPool().maxWait(), maxConnections, this.configuration.connectionPool().maxPendingRequests());
        channelInitializer.setChannelPool(pool);
        return pool;
    }

    private FailoverRequestBalancingStrategy createBalancer(WrappedByteArray cacheName) {
        FailoverRequestBalancingStrategy balancer = this.configuration.balancingStrategyFactory().get();
        balancer.setServers(this.topologyInfo.getServers(cacheName));
        return balancer;
    }

    private void pingServersIgnoreException() {
        Collection<SocketAddress> servers = this.topologyInfo.getServers();
        for (SocketAddress addr : servers) {
            try {
                Util.await(this.fetchChannelAndInvoke(addr, this.operationsFactory.newPingOperation(true)));
            }
            catch (Exception e) {
                if (!trace) continue;
                log.tracef(e, "Ignoring exception pinging configured servers %s to establish a connection", servers);
            }
        }
    }

    public void destroy() {
        try {
            this.channelPoolMap.values().forEach(ChannelPool::close);
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).get();
            this.executorService.shutdownNow();
        }
        catch (Exception e) {
            log.warn("Exception while shutting down the connection pool.", e);
        }
    }

    public CacheTopologyInfo getCacheTopologyInfo(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            CacheTopologyInfo cacheTopologyInfo = this.topologyInfo.getCacheTopologyInfo(cacheName);
            return cacheTopologyInfo;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateHashFunction(Map<SocketAddress, Set<Integer>> servers2Hash, int numKeyOwners, short hashFunctionVersion, int hashSpace, byte[] cacheName, AtomicInteger topologyId) {
        this.lock.writeLock().lock();
        try {
            this.topologyInfo.updateTopology(servers2Hash, numKeyOwners, hashFunctionVersion, hashSpace, cacheName, topologyId);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateHashFunction(SocketAddress[][] segmentOwners, int numSegments, short hashFunctionVersion, byte[] cacheName, AtomicInteger topologyId) {
        this.lock.writeLock().lock();
        try {
            this.topologyInfo.updateTopology(segmentOwners, numSegments, hashFunctionVersion, cacheName, topologyId);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public <T extends ChannelOperation> T fetchChannelAndInvoke(Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        return this.fetchChannelAndInvoke(this.getNextServer(failedServers, cacheName), operation);
    }

    public <T extends ChannelOperation> T fetchChannelAndInvoke(SocketAddress server, T operation) {
        ChannelPool pool = this.channelPoolMap.computeIfAbsent(server, this.newPool);
        pool.acquire(operation);
        return operation;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SocketAddress getNextServer(Set<SocketAddress> failedServers, byte[] cacheName) {
        SocketAddress server;
        this.lock.writeLock().lock();
        try {
            FailoverRequestBalancingStrategy balancer = this.getOrCreateIfAbsentBalancer(cacheName);
            server = balancer.nextServer(failedServers);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (trace) {
            log.tracef("[%s] Using the balancer for determining the server: %s", new String(cacheName), server);
        }
        return server;
    }

    @GuardedBy(value="lock")
    private FailoverRequestBalancingStrategy getOrCreateIfAbsentBalancer(byte[] cacheName) {
        return this.balancers.computeIfAbsent(new WrappedByteArray(cacheName), this::createBalancer);
    }

    public SocketAddress getSocketAddress(Object key, byte[] cacheName) {
        return this.topologyInfo.getHashAwareServer(key, cacheName).orElse(null);
    }

    public <T extends ChannelOperation> T fetchChannelAndInvoke(Object key, Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        return this.fetchChannelAndInvoke(this.topologyInfo.getHashAwareServer(key, cacheName), failedServers, cacheName, operation);
    }

    public <T extends ChannelOperation> T fetchChannelAndInvokeForSegments(Set<Integer> segments, Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        return this.fetchChannelAndInvoke(this.topologyInfo.getHashAwareServer(segments, cacheName), failedServers, cacheName, operation);
    }

    private <T extends ChannelOperation> T fetchChannelAndInvoke(Optional<SocketAddress> hashAwareServer, Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        if (failedServers != null) {
            hashAwareServer = hashAwareServer.filter(server -> !failedServers.contains(server));
        }
        SocketAddress server2 = hashAwareServer.orElseGet(() -> this.getNextServer(failedServers, cacheName));
        return this.fetchChannelAndInvoke(server2, operation);
    }

    public void releaseChannel(Channel channel) {
        if (trace) {
            log.tracef("Releasing channel %s", channel);
        }
        ChannelRecord record = ChannelRecord.of(channel);
        record.getChannelPool().release(channel, record);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateServers(Collection<SocketAddress> newServers, byte[] cacheName, boolean quiet) {
        this.lock.writeLock().lock();
        try {
            Collection<SocketAddress> servers = this.updateTopologyInfo(cacheName, newServers, quiet);
            if (!servers.isEmpty()) {
                FailoverRequestBalancingStrategy balancer = this.getOrCreateIfAbsentBalancer(cacheName);
                balancer.setServers(servers);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateServers(Collection<SocketAddress> newServers) {
        this.lock.writeLock().lock();
        try {
            Collection<SocketAddress> servers = this.updateTopologyInfo(org.infinispan.commons.util.Util.EMPTY_BYTE_ARRAY, newServers, true);
            if (!servers.isEmpty()) {
                for (FailoverRequestBalancingStrategy balancer : this.balancers.values()) {
                    balancer.setServers(servers);
                }
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @GuardedBy(value="lock")
    private Collection<SocketAddress> updateTopologyInfo(byte[] cacheName, Collection<SocketAddress> newServers, boolean quiet) {
        Collection<SocketAddress> servers = this.topologyInfo.getServers(new WrappedByteArray(cacheName));
        HashSet<SocketAddress> addedServers = new HashSet<SocketAddress>(newServers);
        addedServers.removeAll(servers);
        HashSet<SocketAddress> failedServers = new HashSet<SocketAddress>(servers);
        failedServers.removeAll(newServers);
        if (trace) {
            String cacheNameString = cacheName == null ? "<default>" : new String(cacheName);
            log.tracef("[%s] Current list: %s", cacheNameString, servers);
            log.tracef("[%s] New list: %s", cacheNameString, newServers);
            log.tracef("[%s] Added servers: %s", cacheNameString, addedServers);
            log.tracef("[%s] Removed servers: %s", cacheNameString, failedServers);
        }
        if (failedServers.isEmpty() && addedServers.isEmpty()) {
            log.debug("Same list of servers, not changing the pool");
            return Collections.emptyList();
        }
        for (SocketAddress socketAddress : addedServers) {
            log.newServerAdded(socketAddress);
            this.fetchChannelAndInvoke(socketAddress, new ReleaseChannelOperation(quiet));
        }
        for (SocketAddress socketAddress : failedServers) {
            log.removingServer(socketAddress);
            ChannelPool pool = (ChannelPool)this.channelPoolMap.remove(socketAddress);
            if (pool == null) continue;
            pool.close();
        }
        servers = Collections.unmodifiableList(new ArrayList<SocketAddress>(newServers));
        this.topologyInfo.updateServers(cacheName, servers);
        if (!failedServers.isEmpty()) {
            for (Consumer consumer : this.failedServerNotifier) {
                consumer.accept(failedServers);
            }
        }
        return servers;
    }

    public Collection<SocketAddress> getServers() {
        this.lock.readLock().lock();
        try {
            Collection<SocketAddress> collection = this.topologyInfo.getServers();
            return collection;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public ConsistentHash getConsistentHash(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            ConsistentHash consistentHash = this.topologyInfo.getConsistentHash(cacheName);
            return consistentHash;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public ConsistentHashFactory getConsistentHashFactory() {
        return this.topologyInfo.getConsistentHashFactory();
    }

    public boolean isTcpNoDelay() {
        return this.configuration.tcpNoDelay();
    }

    public boolean isTcpKeepAlive() {
        return this.configuration.tcpKeepAlive();
    }

    public int getMaxRetries() {
        return this.maxRetries;
    }

    public void reset(byte[] cacheName) {
        this.updateServers(this.initialServers, cacheName, true);
        this.topologyInfo.setTopologyId(cacheName, -1);
    }

    public AtomicInteger createTopologyId(byte[] cacheName) {
        return this.topologyInfo.createTopologyId(cacheName, -1);
    }

    public int getTopologyId(byte[] cacheName) {
        return this.topologyInfo.getTopologyId(cacheName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<ClusterSwitchStatus> trySwitchCluster(String failedClusterName, byte[] cacheName) {
        this.lock.writeLock().lock();
        try {
            if (trace) {
                log.tracef("Trying to switch cluster away from '%s'", failedClusterName);
            }
            if (this.clusters.isEmpty()) {
                log.debugf("No alternative clusters configured, so can't switch cluster", new Object[0]);
                CompletableFuture<ClusterSwitchStatus> completableFuture = NOT_SWITCHED_FUTURE;
                return completableFuture;
            }
            String currentClusterName = this.currentClusterName;
            if (!this.isSwitchedClusterNotAvailable(failedClusterName, currentClusterName)) {
                log.debugf("Cluster already switched from failed cluster `%s` to `%s`, try again", failedClusterName, currentClusterName);
                CompletableFuture<ClusterSwitchStatus> completableFuture = IN_PROGRESS_FUTURE;
                return completableFuture;
            }
            if (!this.topologyInfo.isTopologyValid(cacheName)) {
                CompletableFuture<ClusterSwitchStatus> completableFuture = IN_PROGRESS_FUTURE;
                return completableFuture;
            }
            if (trace) {
                log.tracef("Switching clusters, failed cluster is '%s' and current cluster name is '%s'", failedClusterName, currentClusterName);
            }
            ArrayList<ClusterInfo> candidateClusters = new ArrayList<ClusterInfo>();
            for (ClusterInfo clusterInfo : this.clusters) {
                String clusterName = clusterInfo.clusterName;
                if (clusterName.equals(failedClusterName)) continue;
                candidateClusters.add(clusterInfo);
            }
            Iterator<ClusterInfo> clusterIterator = candidateClusters.iterator();
            if (!clusterIterator.hasNext()) {
                log.debug("No clusters to switch to.");
                CompletableFuture<ClusterSwitchStatus> completableFuture = NOT_SWITCHED_FUTURE;
                return completableFuture;
            }
            ClusterInfo clusterInfo = (ClusterInfo)clusterIterator.next();
            CompletionStage completionStage = this.checkServersAlive(clusterInfo.clusterAddresses).thenCompose((Function)new ClusterSwitcher(clusterIterator, cacheName, clusterInfo));
            return completionStage;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private CompletableFuture<Boolean> checkServersAlive(Collection<SocketAddress> servers) {
        AtomicInteger remainingResponses = new AtomicInteger(servers.size());
        CompletableFuture<Boolean> allFuture = new CompletableFuture<Boolean>();
        for (SocketAddress server : servers) {
            this.fetchChannelAndInvoke(server, this.operationsFactory.newPingOperation(true)).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    if (trace) {
                        log.tracef((Throwable)throwable, "Error checking whether this server is alive: %s", server);
                    }
                    if (remainingResponses.decrementAndGet() == 0) {
                        allFuture.complete(false);
                    }
                } else {
                    log.tracef("Ping to server %s succeeded", server);
                    allFuture.complete(true);
                }
            });
        }
        return allFuture;
    }

    private boolean isSwitchedClusterNotAvailable(String failedClusterName, String currentClusterName) {
        return currentClusterName.equals(failedClusterName);
    }

    public Marshaller getMarshaller() {
        return this.marshaller;
    }

    public boolean switchToCluster(String clusterName) {
        if (this.clusters.isEmpty()) {
            log.debugf("No alternative clusters configured, so can't switch cluster", new Object[0]);
            return false;
        }
        Collection<SocketAddress> addresses = this.findClusterInfo(clusterName);
        if (!addresses.isEmpty()) {
            this.updateServers(addresses);
            log.debugf("Switching to %s, servers: %s, setting topology.", clusterName, addresses);
            this.topologyInfo.setAllTopologyIds(-2);
            if (log.isInfoEnabled()) {
                if (!clusterName.equals(DEFAULT_CLUSTER_NAME)) {
                    log.manuallySwitchedToCluster(clusterName);
                } else {
                    log.manuallySwitchedBackToMainCluster();
                }
            }
            return true;
        }
        return false;
    }

    public String getCurrentClusterName() {
        return this.currentClusterName;
    }

    public int getTopologyAge() {
        return this.topologyAge.get();
    }

    private Collection<SocketAddress> findClusterInfo(String clusterName) {
        for (ClusterInfo cluster : this.clusters) {
            if (!cluster.clusterName.equals(clusterName)) continue;
            return cluster.clusterAddresses;
        }
        return Collections.emptyList();
    }

    public FailoverRequestBalancingStrategy getBalancer(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            FailoverRequestBalancingStrategy failoverRequestBalancingStrategy = this.balancers.get(new WrappedByteArray(cacheName));
            return failoverRequestBalancingStrategy;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public int socketTimeout() {
        return this.configuration.socketTimeout();
    }

    public int getNumActive(SocketAddress address) {
        ChannelPool pool = (ChannelPool)this.channelPoolMap.get(address);
        return pool == null ? 0 : pool.getActive();
    }

    public int getNumIdle(SocketAddress address) {
        ChannelPool pool = (ChannelPool)this.channelPoolMap.get(address);
        return pool == null ? 0 : pool.getIdle();
    }

    public int getNumActive() {
        return this.channelPoolMap.values().stream().mapToInt(ChannelPool::getActive).sum();
    }

    public int getNumIdle() {
        return this.channelPoolMap.values().stream().mapToInt(ChannelPool::getIdle).sum();
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public long getRetries() {
        return this.totalRetries.longValue();
    }

    public void incrementRetryCount() {
        this.totalRetries.increment();
    }

    private class ReleaseChannelOperation
    implements ChannelOperation {
        private final boolean quiet;

        private ReleaseChannelOperation(boolean quiet) {
            this.quiet = quiet;
        }

        @Override
        public void invoke(Channel channel) {
            ChannelFactory.this.releaseChannel(channel);
        }

        @Override
        public void cancel(SocketAddress address, Throwable cause) {
            if (!this.quiet) {
                log.failedAddingNewServer(address, cause);
            }
        }
    }

    private class ClusterSwitcher
    implements Function<Boolean, CompletionStage<ClusterSwitchStatus>> {
        private final Iterator<ClusterInfo> clusterIterator;
        private final byte[] cacheName;
        private ClusterInfo cluster;

        ClusterSwitcher(Iterator<ClusterInfo> clusterIterator, byte[] cacheName, ClusterInfo cluster) {
            this.clusterIterator = clusterIterator;
            this.cacheName = cacheName;
            this.cluster = cluster;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public CompletionStage<ClusterSwitchStatus> apply(Boolean alive) {
            if (!alive.booleanValue()) {
                if (!this.clusterIterator.hasNext()) {
                    log.debugf("All cluster addresses viewed and none worked: %s", ChannelFactory.this.clusters);
                    return NOT_SWITCHED_FUTURE;
                }
                this.cluster = this.clusterIterator.next();
                return ChannelFactory.this.checkServersAlive(this.cluster.clusterAddresses).thenCompose((Function)this);
            }
            ChannelFactory.this.topologyAge.incrementAndGet();
            ChannelFactory.this.lock.writeLock().lock();
            try {
                Collection servers = ChannelFactory.this.updateTopologyInfo(this.cacheName, this.cluster.clusterAddresses, true);
                if (!servers.isEmpty()) {
                    FailoverRequestBalancingStrategy balancer = ChannelFactory.this.getOrCreateIfAbsentBalancer(this.cacheName);
                    balancer.setServers(servers);
                }
            }
            finally {
                ChannelFactory.this.lock.writeLock().unlock();
            }
            ChannelFactory.this.topologyInfo.setTopologyId(this.cacheName, -2);
            ChannelFactory.this.currentClusterName = this.cluster.clusterName;
            if (log.isInfoEnabled()) {
                if (!this.cluster.clusterName.equals(ChannelFactory.DEFAULT_CLUSTER_NAME)) {
                    log.switchedToCluster(this.cluster.clusterName);
                } else {
                    log.switchedBackToMainCluster();
                }
            }
            return SWITCHED_FUTURE;
        }
    }

    private static final class ClusterInfo {
        final Collection<SocketAddress> clusterAddresses;
        final String clusterName;

        private ClusterInfo(String clusterName, Collection<SocketAddress> clusterAddresses) {
            this.clusterAddresses = clusterAddresses;
            this.clusterName = clusterName;
        }

        public String toString() {
            return "ClusterInfo{name='" + this.clusterName + '\'' + ", addresses=" + this.clusterAddresses + '}';
        }
    }

    public static enum ClusterSwitchStatus {
        NOT_SWITCHED,
        SWITCHED,
        IN_PROGRESS;

    }
}

