/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CachesRegistry;
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.ClientCacheUpdateTimeout;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

public class CacheAffinitySharedManager<K, V>
extends GridCacheSharedManagerAdapter<K, V> {
    private final long clientCacheMsgTimeout = IgniteSystemProperties.getLong("IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT", 10000L);
    private static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new IgniteClosure<ClusterNode, UUID>(){

        @Override
        public UUID apply(ClusterNode node) {
            return node.id();
        }
    };
    private static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new IgniteClosure<ClusterNode, Long>(){

        @Override
        public Long apply(ClusterNode node) {
            return node.order();
        }
    };
    private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<Integer, CacheGroupHolder>();
    private AffinityTopologyVersion lastAffVer;
    private CachesRegistry cachesRegistry;
    private WaitRebalanceInfo waitInfo;
    private final Object mux = new Object();
    private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap<Long, GridDhtAssignmentFetchFuture>();
    private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal();
    private final GridLocalEventListener discoLsnr = new GridLocalEventListener(){

        @Override
        public void onEvent(Event evt) {
            DiscoveryEvent e = (DiscoveryEvent)evt;
            assert (e.type() == 11 || e.type() == 12);
            ClusterNode n = e.eventNode();
            for (GridDhtAssignmentFetchFuture fut : CacheAffinitySharedManager.this.pendingAssignmentFetchFuts.values()) {
                fut.onNodeLeft(n.id());
            }
        }
    };

    @Override
    protected void start0() throws IgniteCheckedException {
        super.start0();
        this.cctx.kernalContext().event().addLocalEventListener(this.discoLsnr, 11, 12);
        this.cachesRegistry = new CachesRegistry(this.cctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onDiscoveryEvent(int type, @Nullable DiscoveryCustomMessage customMsg, ClusterNode node, AffinityTopologyVersion topVer, DiscoveryDataClusterState state) {
        if (type == 10 && node.isLocal()) {
            this.lastAffVer = null;
        }
        if (!(!state.transition() && state.active() || DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg))) {
            return;
        }
        if (!node.isClient() && (type == 12 || type == 10 || type == 11) || DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(customMsg)) {
            Object object = this.mux;
            synchronized (object) {
                assert (this.lastAffVer == null || topVer.compareTo(this.lastAffVer) > 0) : "lastAffVer=" + this.lastAffVer + ", topVer=" + topVer + ", customMsg=" + customMsg;
                this.lastAffVer = topVer;
            }
        }
    }

    public IgniteInternalFuture<?> initCachesOnLocalJoin(Map<Integer, CacheGroupDescriptor> grpDescs, Map<String, DynamicCacheDescriptor> cacheDescs) {
        return this.cachesRegistry.init(grpDescs, cacheDescs);
    }

    boolean onCustomEvent(CacheAffinityChangeMessage msg) {
        if (msg.exchangeId() != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignore affinity change message [lastAffVer=" + this.lastAffVer + ", msgExchId=" + msg.exchangeId() + ", msgVer=" + msg.topologyVersion() + ']');
            }
            return false;
        }
        boolean exchangeNeeded = this.lastAffVer == null || this.lastAffVer.equals(msg.topologyVersion());
        msg.exchangeNeeded(exchangeNeeded);
        if (exchangeNeeded) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Need process affinity change message [lastAffVer=" + this.lastAffVer + ", msgExchId=" + msg.exchangeId() + ", msgVer=" + msg.topologyVersion() + ']');
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Ignore affinity change message [lastAffVer=" + this.lastAffVer + ", msgExchId=" + msg.exchangeId() + ", msgVer=" + msg.topologyVersion() + ']');
        }
        return exchangeNeeded;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onCacheGroupStopped(AffinityTopologyVersion topVer) {
        CacheAffinityChangeMessage msg = null;
        Object object = this.mux;
        synchronized (object) {
            if (this.waitInfo == null || !this.waitInfo.topVer.equals(topVer)) {
                return;
            }
            if (this.waitInfo.waitGrps.isEmpty()) {
                msg = this.affinityChangeMessage(this.waitInfo);
                this.waitInfo = null;
            }
        }
        try {
            if (msg != null) {
                this.cctx.discovery().sendCustomEvent(msg);
            }
        }
        catch (IgniteCheckedException e) {
            U.error(this.log, "Failed to send affinity change message.", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
        CacheAffinityChangeMessage msg = null;
        Object object = this.mux;
        synchronized (object) {
            if (this.waitInfo == null || !this.waitInfo.topVer.equals(this.lastAffVer)) {
                return;
            }
            Map partWait = (Map)this.waitInfo.waitGrps.get(checkGrpId);
            boolean rebalanced = true;
            if (partWait != null) {
                CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.get(checkGrpId);
                if (grpHolder != null) {
                    Iterator it = partWait.entrySet().iterator();
                    while (it.hasNext()) {
                        Map.Entry e = it.next();
                        Integer part = (Integer)e.getKey();
                        UUID waitNode = (UUID)e.getValue();
                        GridDhtPartitionState state = top.partitionState(waitNode, part);
                        if (state != GridDhtPartitionState.OWNING) {
                            rebalanced = false;
                            break;
                        }
                        it.remove();
                    }
                }
                if (rebalanced) {
                    this.waitInfo.waitGrps.remove(checkGrpId);
                    if (this.waitInfo.waitGrps.isEmpty()) {
                        msg = this.affinityChangeMessage(this.waitInfo);
                        this.waitInfo = null;
                    }
                }
            }
            try {
                if (msg != null) {
                    this.cctx.discovery().sendCustomEvent(msg);
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to send affinity change message.", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<Integer> waitGroups() {
        Object object = this.mux;
        synchronized (object) {
            if (this.waitInfo == null || !this.waitInfo.topVer.equals(this.lastAffVer)) {
                return Collections.emptySet();
            }
            return new HashSet<Integer>(this.waitInfo.waitGrps.keySet());
        }
    }

    @Nullable
    private CacheAffinityChangeMessage affinityChangeMessage(WaitRebalanceInfo waitInfo) {
        if (waitInfo.assignments.isEmpty()) {
            return null;
        }
        HashMap<Integer, Map<Integer, List<UUID>>> assignmentsChange = U.newHashMap(waitInfo.assignments.size());
        for (Map.Entry e : waitInfo.assignments.entrySet()) {
            Integer grpId = (Integer)e.getKey();
            Map assignment = (Map)e.getValue();
            HashMap assignment0 = U.newHashMap(assignment.size());
            for (Map.Entry e0 : assignment.entrySet()) {
                assignment0.put(e0.getKey(), CacheAffinitySharedManager.toIds0((List)e0.getValue()));
            }
            assignmentsChange.put(grpId, assignment0);
        }
        return new CacheAffinityChangeMessage(waitInfo.topVer, assignmentsChange, waitInfo.deploymentIds);
    }

    void onCacheGroupCreated(CacheGroupContext grp) {
        if (!this.grpHolders.containsKey(grp.groupId())) {
            this.cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>(){

                @Override
                public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
                    CacheAffinitySharedManager.this.processAffinityAssignmentResponse(nodeId, res);
                }
            });
        }
    }

    @Nullable
    private List<DynamicCacheDescriptor> clientCachesToStart(UUID reqId, Map<String, DynamicCacheChangeRequest> startReqs) {
        ArrayList<DynamicCacheDescriptor> startDescs = new ArrayList<DynamicCacheDescriptor>(startReqs.size());
        for (DynamicCacheChangeRequest startReq : startReqs.values()) {
            DynamicCacheDescriptor desc = this.cachesRegistry.cache(CU.cacheId(startReq.cacheName()));
            if (desc == null) {
                CacheException err = new CacheException("Failed to start client cache (a cache with the given name is not started): " + startReq.cacheName());
                this.cctx.cache().completeClientCacheChangeFuture(reqId, err);
                return null;
            }
            if (this.cctx.cacheContext(desc.cacheId()) != null) continue;
            startDescs.add(desc);
        }
        return startDescs;
    }

    @Nullable
    private Map<Integer, Boolean> processClientCacheStartRequests(ClientCacheChangeDummyDiscoveryMessage msg, boolean crd, AffinityTopologyVersion topVer, DiscoCache discoCache) {
        CacheGroupContext grp;
        ClientCacheDhtTopologyFuture topFut;
        Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests();
        if (startReqs == null) {
            return null;
        }
        List<DynamicCacheDescriptor> startDescs = this.clientCachesToStart(msg.requestId(), msg.startRequests());
        if (startDescs == null || startDescs.isEmpty()) {
            this.cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
            return null;
        }
        HashMap<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
        HashSet<String> startedCaches = U.newHashSet(startDescs.size());
        HashMap<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size());
        for (DynamicCacheDescriptor desc : startDescs) {
            try {
                startedCaches.add(desc.cacheName());
                DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
                this.cctx.cache().prepareCacheStart(desc.cacheConfiguration(), desc, startReq.nearCacheConfiguration(), topVer, startReq.disabledAfterStart());
                startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
                CacheGroupContext grp2 = this.cctx.cache().cacheGroup(desc.groupId());
                assert (grp2 != null) : desc.groupId();
                assert (!grp2.affinityNode() || grp2.isLocal()) : grp2.cacheOrGroupName();
                if (grp2.isLocal() || !grp2.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) continue;
                assert (grp2.localStartVersion().equals(topVer)) : grp2.localStartVersion();
                if (crd) {
                    CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.get(grp2.groupId());
                    assert (grpHolder != null && grpHolder.affinity().idealAssignment() != null);
                    if (!grpHolder.client()) continue;
                    topFut = new ClientCacheDhtTopologyFuture(topVer);
                    grp2.topology().updateTopologyVersion(topFut, discoCache, this.cctx.coordinators().currentCoordinator(), -1L, false);
                    grpHolder = new CacheGroupHolder1(grp2, grpHolder.affinity());
                    this.grpHolders.put(grp2.groupId(), grpHolder);
                    GridClientPartitionTopology clientTop = this.cctx.exchange().clearClientTopology(grp2.groupId());
                    if (clientTop != null) {
                        grp2.topology().update(grpHolder.affinity().lastVersion(), clientTop.partitionMap(true), clientTop.fullUpdateCounters(), Collections.emptySet(), null, null);
                    }
                    assert (grpHolder.affinity().lastVersion().equals(grp2.affinity().lastVersion()));
                    continue;
                }
                if (fetchFuts.containsKey(grp2.groupId())) continue;
                GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(this.cctx, grp2.groupId(), topVer, discoCache);
                fetchFut.init(true);
                fetchFuts.put(grp2.groupId(), fetchFut);
            }
            catch (IgniteCheckedException e) {
                this.cctx.cache().closeCaches(startedCaches, false);
                this.cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
                return null;
            }
        }
        for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) {
            try {
                GridDhtPartitionFullMap partMap;
                grp = this.cctx.cache().cacheGroup(fetchFut.groupId());
                assert (grp != null);
                GridDhtAffinityAssignmentResponse res = this.fetchAffinity(topVer, this.cctx.coordinators().currentCoordinator(), null, discoCache, grp.affinity(), fetchFut);
                if (res != null) {
                    partMap = res.partitionMap();
                    assert (partMap != null) : res;
                    topFut = new ClientCacheDhtTopologyFuture(topVer);
                } else {
                    partMap = new GridDhtPartitionFullMap(this.cctx.localNodeId(), this.cctx.localNode().order(), 1L);
                    topFut = new ClientCacheDhtTopologyFuture(topVer, new ClusterTopologyServerNotFoundException("All server nodes left grid."));
                }
                grp.topology().updateTopologyVersion(topFut, discoCache, this.cctx.coordinators().currentCoordinator(), -1L, false);
                grp.topology().update(topVer, partMap, null, Collections.emptySet(), null, null);
                topFut.validate(grp, discoCache.allNodes());
            }
            catch (IgniteCheckedException e) {
                this.cctx.cache().closeCaches(startedCaches, false);
                this.cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
                return null;
            }
        }
        for (DynamicCacheDescriptor desc : startDescs) {
            if (desc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL) continue;
            grp = this.cctx.cache().cacheGroup(desc.groupId());
            assert (grp != null);
            grp.topology().onExchangeDone(null, grp.affinity().cachedAffinity(topVer), true);
        }
        this.cctx.cache().initCacheProxies(topVer, null);
        startReqs.keySet().forEach(req -> this.cctx.cache().completeProxyInitialize((String)req));
        this.cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
        return startedInfos;
    }

    private Set<Integer> processCacheCloseRequests(ClientCacheChangeDummyDiscoveryMessage msg, boolean crd, AffinityTopologyVersion topVer) {
        Set<String> cachesToClose = msg.cachesToClose();
        if (cachesToClose == null) {
            return null;
        }
        Set<Integer> closed = this.cctx.cache().closeCaches(cachesToClose, true);
        if (crd) {
            for (CacheGroupHolder hld : this.grpHolders.values()) {
                if (hld.client() || this.cctx.cache().cacheGroup(hld.groupId()) != null) continue;
                int grpId = hld.groupId();
                CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.remove(grpId);
                assert (grpHolder != null && !grpHolder.client()) : grpHolder;
                try {
                    grpHolder = CacheGroupHolder2.create(this.cctx, this.cachesRegistry.group(grpId), topVer, grpHolder.affinity());
                    this.grpHolders.put(grpId, grpHolder);
                }
                catch (IgniteCheckedException e) {
                    U.error(this.log, "Failed to initialize cache: " + e, e);
                }
            }
        }
        this.cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
        return closed;
    }

    void processClientCachesChanges(ClientCacheChangeDummyDiscoveryMessage msg) {
        AffinityTopologyVersion topVer = this.cctx.exchange().readyAffinityVersion();
        DiscoCache discoCache = this.cctx.discovery().discoCache(topVer);
        boolean crd = this.cctx.localNode().equals(discoCache.oldestAliveServerNode());
        Map<Integer, Boolean> startedCaches = this.processClientCacheStartRequests(msg, crd, topVer, discoCache);
        Set<Integer> closedCaches = this.processCacheCloseRequests(msg, crd, topVer);
        if (startedCaches != null || closedCaches != null) {
            this.scheduleClientChangeMessage(startedCaches, closedCaches);
        }
    }

    void sendClientCacheChangesMessage(ClientCacheUpdateTimeout timeoutObj) {
        ClientCacheChangeDiscoveryMessage msg = this.clientCacheChanges.get();
        if (msg != null && msg.updateTimeoutObject() == timeoutObj) {
            assert (!msg.empty()) : msg;
            this.clientCacheChanges.remove();
            msg.checkCachesExist(this.cachesRegistry.allCaches().keySet());
            try {
                if (!msg.empty()) {
                    this.cctx.discovery().sendCustomEvent(msg);
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to send discovery event: " + e, e);
            }
        }
    }

    private void scheduleClientChangeMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
        long timeout;
        ClientCacheChangeDiscoveryMessage msg = this.clientCacheChanges.get();
        if (msg == null) {
            msg = new ClientCacheChangeDiscoveryMessage(startedCaches, closedCaches);
            this.clientCacheChanges.set(msg);
        } else {
            msg.merge(startedCaches, closedCaches);
            if (msg.empty()) {
                this.cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
                this.clientCacheChanges.remove();
                return;
            }
        }
        if (msg.updateTimeoutObject() != null) {
            this.cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
        }
        if ((timeout = this.clientCacheMsgTimeout) <= 0L) {
            timeout = 10000L;
        }
        ClientCacheUpdateTimeout timeoutObj = new ClientCacheUpdateTimeout(this.cctx, timeout);
        msg.updateTimeoutObject(timeoutObj);
        this.cctx.time().addTimeoutObject(timeoutObj);
    }

    public void onCustomMessageNoAffinityChange(GridDhtPartitionsExchangeFuture fut, boolean crd, final @Nullable ExchangeActions exchActions) {
        final ExchangeDiscoveryEvents evts = fut.context().events();
        this.forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>(){

            @Override
            public void applyx(GridAffinityAssignmentCache aff) {
                if (exchActions != null && exchActions.cacheGroupStopping(aff.groupId())) {
                    return;
                }
                aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion());
                CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
            }
        });
    }

    public void stopCacheOnReconnect(GridCacheContext cctx) {
        this.cachesRegistry.unregisterCache(cctx.cacheId());
    }

    public void stopCacheGroupOnReconnect(CacheGroupContext grpCtx) {
        this.cachesRegistry.unregisterGroup(grpCtx.groupId());
    }

    public void forceCloseCaches(GridDhtPartitionsExchangeFuture fut, boolean crd, ExchangeActions exchActions) {
        assert (exchActions != null && !exchActions.empty() && exchActions.cacheStartRequests().isEmpty()) : exchActions;
        IgniteInternalFuture<?> res = this.cachesRegistry.update(exchActions);
        assert (res.isDone()) : "There should be no caches to start: " + exchActions;
        this.processCacheStopRequests(fut, crd, exchActions, true);
        this.cctx.cache().forceCloseCaches(exchActions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IgniteInternalFuture<?> onCacheChangeRequest(GridDhtPartitionsExchangeFuture fut, boolean crd, ExchangeActions exchActions) throws IgniteCheckedException {
        ClientCacheChangeDiscoveryMessage msg;
        assert (exchActions != null && !exchActions.empty()) : exchActions;
        long time = System.currentTimeMillis();
        IgniteInternalFuture<?> res = this.cachesRegistry.update(exchActions);
        this.onCustomMessageNoAffinityChange(fut, crd, exchActions);
        if (this.log.isInfoEnabled()) {
            this.log.info("Updating caches registry performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
        this.processCacheStartRequests(fut, crd, exchActions);
        Set<Integer> stoppedGrps = this.processCacheStopRequests(fut, crd, exchActions, false);
        if (stoppedGrps != null) {
            AffinityTopologyVersion notifyTopVer = null;
            Object object = this.mux;
            synchronized (object) {
                if (this.waitInfo != null) {
                    for (Integer grpId : stoppedGrps) {
                        boolean rmv = this.waitInfo.waitGrps.remove(grpId) != null;
                        if (!rmv) continue;
                        notifyTopVer = this.waitInfo.topVer;
                        this.waitInfo.assignments.remove(grpId);
                    }
                }
            }
            if (notifyTopVer != null) {
                final AffinityTopologyVersion topVer = notifyTopVer;
                this.cctx.kernalContext().closure().runLocalSafe(new Runnable(){

                    @Override
                    public void run() {
                        CacheAffinitySharedManager.this.onCacheGroupStopped(topVer);
                    }
                });
            }
        }
        if ((msg = this.clientCacheChanges.get()) != null) {
            msg.checkCachesExist(this.cachesRegistry.allCaches().keySet());
            if (msg.empty()) {
                this.clientCacheChanges.remove();
            }
        }
        return res;
    }

    private void processCacheStartRequests(GridDhtPartitionsExchangeFuture fut, boolean crd, ExchangeActions exchActions) throws IgniteCheckedException {
        assert (exchActions != null && !exchActions.empty()) : exchActions;
        ExchangeDiscoveryEvents evts = fut.context().events();
        long time = System.currentTimeMillis();
        for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
            boolean startCache;
            DynamicCacheDescriptor cacheDesc = action.descriptor();
            DynamicCacheChangeRequest req = action.request();
            NearCacheConfiguration nearCfg = null;
            if (req.locallyConfigured() || this.cctx.localNodeId().equals(req.initiatingNodeId()) && !exchActions.activate()) {
                startCache = true;
                nearCfg = req.nearCacheConfiguration();
            } else {
                assert (this.cctx.cacheContext(cacheDesc.cacheId()) == null) : "Starting cache has not null context: " + cacheDesc.cacheName();
                IgniteCacheProxyImpl<?, ?> cacheProxy = this.cctx.cache().jcacheProxy(req.cacheName(), false);
                if (cacheProxy != null) {
                    assert (cacheProxy.isRestarting()) : "Cache has non restarting proxy " + cacheProxy;
                    startCache = true;
                } else {
                    startCache = CU.affinityNode(this.cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter());
                }
            }
            try {
                if (!startCache) continue;
                this.cctx.cache().prepareCacheStart(req.startCacheConfiguration(), cacheDesc, nearCfg, evts.topologyVersion(), req.disabledAfterStart());
                if (!fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom()) || !fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) continue;
                U.quietAndWarn(this.log, "No server nodes found for cache client: " + req.cacheName());
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to initialize cache. Will try to rollback cache start routine. [cacheName=" + req.cacheName() + ']', e);
                this.cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false);
                this.cctx.cache().completeCacheStartFuture(req, false, e);
            }
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("Caches starting performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
        time = System.currentTimeMillis();
        HashSet<Integer> gprs = new HashSet<Integer>();
        for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) {
            int grpId = action.descriptor().groupId();
            if (!gprs.add(grpId)) continue;
            if (crd) {
                this.initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());
                continue;
            }
            CacheGroupContext grp = this.cctx.cache().cacheGroup(grpId);
            if (grp == null || grp.isLocal() || !grp.localStartVersion().equals(fut.initialVersion())) continue;
            assert (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) : grp.affinity().lastVersion();
            this.initAffinity(this.cachesRegistry.group(grp.groupId()), grp.affinity(), fut);
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity initialization for started caches performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
    }

    private Set<Integer> processCacheStopRequests(GridDhtPartitionsExchangeFuture fut, boolean crd, ExchangeActions exchActions, boolean forceClose) {
        assert (exchActions != null && !exchActions.empty()) : exchActions;
        for (ExchangeActions.CacheActionData cacheActionData : exchActions.cacheStopRequests()) {
            this.cctx.cache().blockGateway(cacheActionData.request().cacheName(), true, cacheActionData.request().restart());
        }
        for (ExchangeActions.CacheGroupActionData cacheGroupActionData : exchActions.cacheGroupsToStop()) {
            this.cctx.exchange().clearClientTopology(cacheGroupActionData.descriptor().groupId());
        }
        HashSet<Integer> stoppedGrps = null;
        if (crd) {
            for (ExchangeActions.CacheGroupActionData data : exchActions.cacheGroupsToStop()) {
                if (data.descriptor().config().getCacheMode() == CacheMode.LOCAL) continue;
                CacheGroupHolder cacheGrp = (CacheGroupHolder)this.grpHolders.remove(data.descriptor().groupId());
                assert (cacheGrp != null || forceClose) : data.descriptor();
                if (cacheGrp == null) continue;
                if (stoppedGrps == null) {
                    stoppedGrps = new HashSet<Integer>();
                }
                stoppedGrps.add(cacheGrp.groupId());
                this.cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class);
            }
        }
        return stoppedGrps;
    }

    public void clearGroupHoldersAndRegistry() {
        this.grpHolders.clear();
        this.cachesRegistry.unregisterAll();
    }

    public void onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture exchFut, boolean crd, CacheAffinityChangeMessage msg) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Process exchange affinity change message [exchVer=" + exchFut.initialVersion() + ", msg=" + msg + ']');
        }
        assert (exchFut.exchangeId().equals(msg.exchangeId())) : msg;
        final AffinityTopologyVersion topVer = exchFut.initialVersion();
        final Map<Integer, Map<Integer, List<UUID>>> assignment = msg.assignmentChange();
        assert (assignment != null);
        final HashMap affCache = new HashMap();
        this.forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>(){

            @Override
            public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                List<List<ClusterNode>> newAssignment;
                List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
                assert (idealAssignment != null);
                Map cacheAssignment = (Map)assignment.get(aff.groupId());
                if (cacheAssignment != null) {
                    newAssignment = new ArrayList<List<ClusterNode>>(idealAssignment);
                    for (Map.Entry e : cacheAssignment.entrySet()) {
                        newAssignment.set((Integer)e.getKey(), CacheAffinitySharedManager.this.toNodes(topVer, (List)e.getValue()));
                    }
                } else {
                    newAssignment = idealAssignment;
                }
                aff.initialize(topVer, CacheAffinitySharedManager.this.cachedAssignment(aff, newAssignment, affCache));
            }
        });
    }

    public void onChangeAffinityMessage(final GridDhtPartitionsExchangeFuture exchFut, boolean crd, final CacheAffinityChangeMessage msg) throws IgniteCheckedException {
        assert (msg.topologyVersion() != null && msg.exchangeId() == null) : msg;
        final AffinityTopologyVersion topVer = exchFut.initialVersion();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Process affinity change message [exchVer=" + topVer + ", msgVer=" + msg.topologyVersion() + ']');
        }
        final Map<Integer, Map<Integer, List<UUID>>> affChange = msg.assignmentChange();
        assert (!F.isEmpty(affChange)) : msg;
        final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds();
        final HashMap affCache = new HashMap();
        this.forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>(){

            @Override
            public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                AffinityTopologyVersion affTopVer = aff.lastVersion();
                assert (affTopVer.topologyVersion() > 0L) : affTopVer;
                CacheGroupDescriptor desc = CacheAffinitySharedManager.this.cachesRegistry.group(aff.groupId());
                assert (desc != null) : aff.cacheOrGroupName();
                IgniteUuid deploymentId = desc.deploymentId();
                if (!deploymentId.equals(deploymentIds.get(aff.groupId()))) {
                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
                    return;
                }
                Map change = (Map)affChange.get(aff.groupId());
                if (change != null) {
                    assert (!change.isEmpty()) : msg;
                    List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
                    ArrayList<List<ClusterNode>> assignment = new ArrayList<List<ClusterNode>>(curAff);
                    for (Map.Entry e : change.entrySet()) {
                        Integer part = (Integer)e.getKey();
                        List nodes = CacheAffinitySharedManager.this.toNodes(topVer, (List)e.getValue());
                        assert (!nodes.equals(assignment.get(part))) : "Assignment did not change [cacheGrp=" + aff.cacheOrGroupName() + ", part=" + part + ", cur=" + F.nodeIds((Collection)assignment.get(part)) + ", new=" + F.nodeIds(nodes) + ", exchVer=" + exchFut.initialVersion() + ", msgVer=" + msg.topologyVersion() + ']';
                        assignment.set(part, nodes);
                    }
                    aff.initialize(topVer, CacheAffinitySharedManager.this.cachedAssignment(aff, assignment, affCache));
                } else {
                    aff.clientEventTopologyChange(exchFut.firstEvent(), topVer);
                }
                CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
            }
        });
    }

    public void onClientEvent(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        boolean locJoin = fut.firstEvent().eventNode().isLocal();
        if (!locJoin) {
            this.forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>(){

                @Override
                public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                    AffinityTopologyVersion topVer = fut.initialVersion();
                    aff.clientEventTopologyChange(fut.firstEvent(), topVer);
                    CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                }
            });
        } else {
            this.fetchAffinityOnJoin(fut);
        }
    }

    public void addDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
        GridDhtAssignmentFetchFuture old = this.pendingAssignmentFetchFuts.putIfAbsent(fut.id(), fut);
        assert (old == null) : "More than one thread is trying to fetch partition assignments [fut=" + fut + ", allFuts=" + this.pendingAssignmentFetchFuts + ']';
    }

    public void removeDhtAssignmentFetchFuture(GridDhtAssignmentFetchFuture fut) {
        boolean rmv = this.pendingAssignmentFetchFuts.remove(fut.id(), fut);
        assert (rmv) : "Failed to remove assignment fetch future: " + fut.id();
    }

    private void processAffinityAssignmentResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
        GridDhtAssignmentFetchFuture fut;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']');
        }
        if ((fut = (GridDhtAssignmentFetchFuture)this.pendingAssignmentFetchFuts.get(res.futureId())) != null) {
            fut.onResponse(nodeId, res);
        }
    }

    private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
        for (CacheGroupDescriptor cacheDesc : this.cachesRegistry.allGroups().values()) {
            if (cacheDesc.config().getCacheMode() == CacheMode.LOCAL) continue;
            c.applyx(cacheDesc);
        }
    }

    private void forAllCacheGroups(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) {
        if (crd) {
            for (CacheGroupHolder grp : this.grpHolders.values()) {
                c.apply(grp.affinity());
            }
        } else {
            for (CacheGroupContext grp : this.cctx.kernalContext().cache().cacheGroups()) {
                if (grp.isLocal()) continue;
                c.apply(grp.affinity());
            }
        }
    }

    private void initStartedGroupOnCoordinator(GridDhtPartitionsExchangeFuture fut, CacheGroupDescriptor grpDesc) throws IgniteCheckedException {
        assert (grpDesc != null && grpDesc.groupId() != 0) : grpDesc;
        if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) {
            return;
        }
        int grpId = grpDesc.groupId();
        CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.get(grpId);
        CacheGroupContext grp = this.cctx.kernalContext().cache().cacheGroup(grpId);
        if (grpHolder == null) {
            grpHolder = grp != null ? new CacheGroupHolder1(grp, null) : CacheGroupHolder2.create(this.cctx, grpDesc, fut.initialVersion(), null);
            CacheGroupHolder old = this.grpHolders.put(grpId, grpHolder);
            assert (old == null) : old;
            this.calculateAndInit(fut.events(), grpHolder.affinity(), fut.initialVersion());
        } else if (grpHolder.client() && grp != null) {
            assert (grpHolder.affinity().idealAssignment() != null);
            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
            this.grpHolders.put(grpId, grpHolder);
        }
    }

    public IgniteInternalFuture<?> initStartedCaches(boolean crd, final GridDhtPartitionsExchangeFuture fut, Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
        IgniteInternalFuture<?> res = this.cachesRegistry.addUnregistered(descs);
        if (fut.context().mergeExchanges()) {
            return res;
        }
        if (crd) {
            this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

                @Override
                public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                    CacheGroupHolder cache = CacheAffinitySharedManager.this.groupHolder(fut.initialVersion(), desc);
                    if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
                        CacheAffinitySharedManager.this.calculateAndInit(fut.events(), cache.affinity(), fut.initialVersion());
                        CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                    }
                }
            });
        } else {
            this.forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>(){

                @Override
                public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                    if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) {
                        CacheAffinitySharedManager.this.initAffinity(CacheAffinitySharedManager.this.cachesRegistry.group(aff.groupId()), aff, fut);
                        CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                    }
                }
            });
        }
        return res;
    }

    private void initAffinity(CacheGroupDescriptor desc, GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        assert (desc != null) : aff.cacheOrGroupName();
        ExchangeDiscoveryEvents evts = fut.context().events();
        if (this.canCalculateAffinity(desc, aff, fut)) {
            this.calculateAndInit(evts, aff, evts.topologyVersion());
        } else {
            GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(this.cctx, desc.groupId(), evts.topologyVersion(), evts.discoveryCache());
            fetchFut.init(false);
            this.fetchAffinity(evts.topologyVersion(), this.cctx.coordinators().currentCoordinator(), evts, evts.discoveryCache(), aff, fetchFut);
        }
    }

    private boolean canCalculateAffinity(CacheGroupDescriptor desc, GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut) {
        assert (desc != null) : aff.cacheOrGroupName();
        if (!aff.centralizedAffinityFunction()) {
            return true;
        }
        List<ClusterNode> affNodes = fut.events().discoveryCache().cacheGroupAffinityNodes(aff.groupId());
        return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) || !fut.exchangeId().nodeId().equals(this.cctx.localNodeId()) || affNodes.isEmpty() || affNodes.size() == 1 && affNodes.contains(this.cctx.localNode());
    }

    public GridAffinityAssignmentCache affinity(Integer grpId) {
        CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.get(grpId);
        assert (grpHolder != null) : this.debugGroupName(grpId);
        return grpHolder.affinity();
    }

    public void applyAffinityFromFullMessage(final GridDhtPartitionsExchangeFuture fut, final GridDhtPartitionsFullMessage msg) {
        final HashMap nodesByOrder = new HashMap();
        final HashMap affCache = new HashMap();
        long time = System.currentTimeMillis();
        this.forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>(){

            @Override
            public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                List<List<ClusterNode>> newAssignment;
                CacheGroupAffinityMessage affMsg;
                ExchangeDiscoveryEvents evts = fut.context().events();
                Map<Integer, CacheGroupAffinityMessage> idealAffDiff = msg.idealAffinityDiff();
                List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
                CacheGroupAffinityMessage cacheGroupAffinityMessage = affMsg = idealAffDiff != null ? idealAffDiff.get(aff.groupId()) : null;
                if (affMsg != null) {
                    Map<Integer, GridLongList> diff = affMsg.assignmentsDiff();
                    assert (!F.isEmpty(diff));
                    newAssignment = new ArrayList<List<ClusterNode>>(idealAssignment);
                    for (Map.Entry<Integer, GridLongList> e : diff.entrySet()) {
                        GridLongList assign = e.getValue();
                        newAssignment.set(e.getKey(), CacheGroupAffinityMessage.toNodes(assign, nodesByOrder, evts.discoveryCache()));
                    }
                } else {
                    newAssignment = idealAssignment;
                }
                aff.initialize(evts.topologyVersion(), CacheAffinitySharedManager.this.cachedAssignment(aff, newAssignment, affCache));
            }
        });
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity applying from full message performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
    }

    public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg, final AffinityTopologyVersion resTopVer) {
        final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin();
        final HashMap nodesByOrder = new HashMap();
        final Map<Integer, CacheGroupAffinityMessage> receivedAff = msg.joinedNodeAffinity();
        assert (F.isEmpty(affReq) || !F.isEmpty(receivedAff) && receivedAff.size() >= affReq.size()) : "Requested and received affinity are different [requestedCnt=" + (affReq != null ? Integer.valueOf(affReq.size()) : "none") + ", receivedCnt=" + (receivedAff != null ? Integer.valueOf(receivedAff.size()) : "none") + ", msg=" + msg + "]";
        long time = System.currentTimeMillis();
        this.forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>(){

            @Override
            public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                ExchangeDiscoveryEvents evts = fut.context().events();
                CacheGroupContext grp = CacheAffinitySharedManager.this.cctx.cache().cacheGroup(aff.groupId());
                assert (grp != null);
                if (affReq != null && affReq.contains(aff.groupId())) {
                    assert (AffinityTopologyVersion.NONE.equals(aff.lastVersion())) : aff.lastVersion();
                    CacheGroupAffinityMessage affMsg = (CacheGroupAffinityMessage)receivedAff.get(aff.groupId());
                    assert (affMsg != null);
                    List<List<ClusterNode>> assignments = affMsg.createAssignments(nodesByOrder, evts.discoveryCache());
                    assert (resTopVer.equals(evts.topologyVersion())) : "resTopVer=" + resTopVer + ", evts.topVer=" + evts.topologyVersion();
                    List<List<ClusterNode>> idealAssign = affMsg.createIdealAssignments(nodesByOrder, evts.discoveryCache());
                    if (idealAssign != null) {
                        aff.idealAssignment(idealAssign);
                    } else {
                        assert (!aff.centralizedAffinityFunction()) : aff;
                        aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
                    }
                    aff.initialize(evts.topologyVersion(), assignments);
                } else if (fut.cacheGroupAddedOnExchange(aff.groupId(), grp.receivedFrom())) {
                    CacheAffinitySharedManager.this.calculateAndInit(evts, aff, evts.topologyVersion());
                }
                grp.topology().initPartitionsWhenAffinityReady(resTopVer, fut);
            }
        });
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity initialization on local join performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
    }

    public void onServerJoinWithExchangeMergeProtocol(GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        ExchangeDiscoveryEvents evts = fut.context().events();
        assert (fut.context().mergeExchanges());
        assert (evts.hasServerJoin() && !evts.hasServerLeft());
        long time = System.currentTimeMillis();
        WaitRebalanceInfo waitRebalanceInfo = this.initAffinityOnNodeJoin(fut, crd);
        WaitRebalanceInfo info = this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
        if (crd && this.log.isDebugEnabled()) {
            this.log.debug("Computed new affinity after node join [topVer=" + evts.topologyVersion() + ", waitGrps=" + (info != null ? this.groupNames(info.waitGrps.keySet()) : null) + ']');
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity recalculation (on server join) performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
    }

    public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        long time = System.currentTimeMillis();
        ExchangeDiscoveryEvents evts = fut.context().events();
        assert (fut.context().mergeExchanges());
        assert (evts.hasServerLeft());
        Map<Integer, CacheGroupAffinityMessage> result = this.onReassignmentEnforced(fut);
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity recalculation (on server left) performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
        return result;
    }

    public Map<Integer, CacheGroupAffinityMessage> onCustomEventWithEnforcedAffinityReassignment(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        assert (DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent()));
        long time = System.currentTimeMillis();
        Map<Integer, CacheGroupAffinityMessage> result = this.onReassignmentEnforced(fut);
        if (this.log.isInfoEnabled()) {
            this.log.info("Affinity recalculation (custom message) performed in " + (System.currentTimeMillis() - time) + " ms.");
        }
        return result;
    }

    private Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        final ExchangeDiscoveryEvents evts = fut.context().events();
        this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

            @Override
            public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                AffinityTopologyVersion topVer = evts.topologyVersion();
                CacheGroupHolder cache = CacheAffinitySharedManager.this.groupHolder(topVer, desc);
                List<List<ClusterNode>> assign = cache.affinity().calculate(topVer, evts, evts.discoveryCache());
                if (!cache.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) {
                    cache.affinity().initialize(topVer, assign);
                }
            }
        });
        Map<Integer, Map<Integer, List<Long>>> diff = this.initAffinityBasedOnPartitionsAvailability(evts.topologyVersion(), fut, NODE_TO_ORDER, true);
        return CacheGroupAffinityMessage.createAffinityDiffMessages(diff);
    }

    public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        assert (!fut.firstEvent().eventNode().isClient());
        boolean locJoin = fut.firstEvent().eventNode().isLocal();
        WaitRebalanceInfo waitRebalanceInfo = null;
        if (locJoin) {
            if (crd) {
                this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

                    @Override
                    public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                        AffinityTopologyVersion topVer = fut.initialVersion();
                        CacheGroupHolder grpHolder = CacheAffinitySharedManager.this.groupHolder(topVer, desc);
                        CacheAffinitySharedManager.this.calculateAndInit(fut.events(), grpHolder.affinity(), topVer);
                        CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                    }
                });
            } else {
                this.fetchAffinityOnJoin(fut);
            }
        } else {
            waitRebalanceInfo = this.initAffinityOnNodeJoin(fut, crd);
        }
        WaitRebalanceInfo info = this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
        if (crd && this.log.isDebugEnabled()) {
            this.log.debug("Computed new affinity after node join [topVer=" + fut.initialVersion() + ", waitGrps=" + (info != null ? this.groupNames(info.waitGrps.keySet()) : null) + ']');
        }
    }

    public void onBaselineTopologyChanged(GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        assert (!fut.firstEvent().eventNode().isClient());
        WaitRebalanceInfo waitRebalanceInfo = this.initAffinityOnNodeJoin(fut, crd);
        WaitRebalanceInfo info = this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
        if (crd && this.log.isDebugEnabled()) {
            this.log.debug("Computed new affinity after node join [topVer=" + fut.initialVersion() + ", waitGrps=" + (info != null ? this.groupNames(info.waitGrps.keySet()) : null) + ']');
        }
    }

    private String groupNames(Collection<Integer> grpIds) {
        StringBuilder names = new StringBuilder();
        for (Integer grpId : grpIds) {
            String name = this.cachesRegistry.group(grpId).cacheOrGroupName();
            if (names.length() != 0) {
                names.append(", ");
            }
            names.append(name);
        }
        return names.toString();
    }

    private String debugGroupName(int grpId) {
        CacheGroupDescriptor desc = this.cachesRegistry.group(grpId);
        if (desc != null) {
            return desc.cacheOrGroupName();
        }
        return "Unknown group: " + grpId;
    }

    private void calculateAndInit(ExchangeDiscoveryEvents evts, GridAffinityAssignmentCache aff, AffinityTopologyVersion topVer) {
        List<List<ClusterNode>> assignment = aff.calculate(topVer, evts, evts.discoveryCache());
        aff.initialize(topVer, assignment);
    }

    private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        AffinityTopologyVersion topVer = fut.initialVersion();
        ArrayList<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<GridDhtAssignmentFetchFuture>();
        for (CacheGroupContext grp : this.cctx.cache().cacheGroups()) {
            if (grp.isLocal()) continue;
            if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
                if (!fut.context().mergeExchanges()) {
                    this.calculateAndInit(fut.events(), grp.affinity(), topVer);
                }
            } else if (fut.context().fetchAffinityOnJoin()) {
                CacheGroupDescriptor grpDesc = this.cachesRegistry.group(grp.groupId());
                assert (grpDesc != null) : grp.cacheOrGroupName();
                GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(this.cctx, grpDesc.groupId(), topVer, fut.events().discoveryCache());
                fetchFut.init(false);
                fetchFuts.add(fetchFut);
            } else if (!fut.events().discoveryCache().serverNodes().isEmpty()) {
                fut.context().addGroupAffinityRequestOnJoin(grp.groupId());
            } else {
                this.calculateAndInit(fut.events(), grp.affinity(), topVer);
            }
            this.cctx.exchange().exchangerUpdateHeartbeat();
        }
        for (int i = 0; i < fetchFuts.size(); ++i) {
            GridDhtAssignmentFetchFuture fetchFut = (GridDhtAssignmentFetchFuture)fetchFuts.get(i);
            int grpId = fetchFut.groupId();
            this.fetchAffinity(topVer, this.cctx.coordinators().currentCoordinator(), fut.events(), fut.events().discoveryCache(), this.cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
            this.cctx.exchange().exchangerUpdateHeartbeat();
        }
    }

    private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer, MvccCoordinator mvccCrd, @Nullable ExchangeDiscoveryEvents events, DiscoCache discoCache, GridAffinityAssignmentCache affCache, GridDhtAssignmentFetchFuture fetchFut) throws IgniteCheckedException {
        assert (affCache != null);
        GridDhtAffinityAssignmentResponse res = (GridDhtAffinityAssignmentResponse)fetchFut.get();
        if (res == null) {
            List<List<ClusterNode>> aff = affCache.calculate(topVer, events, discoCache);
            affCache.initialize(topVer, aff, mvccCrd);
        } else {
            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
            if (idealAff != null) {
                affCache.idealAssignment(idealAff);
            } else {
                assert (!affCache.centralizedAffinityFunction());
                affCache.calculate(topVer, events, discoCache);
            }
            List<List<ClusterNode>> aff = res.affinityAssignment(discoCache);
            assert (aff != null) : res;
            affCache.initialize(topVer, aff, mvccCrd);
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean onCentralizedAffinityChange(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        assert (fut.events().hasServerLeft() && !fut.firstEvent().eventNode().isClient() || DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent())) : fut.firstEvent();
        if (crd) {
            this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

                @Override
                public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                    CacheGroupHolder cache = CacheAffinitySharedManager.this.groupHolder(fut.initialVersion(), desc);
                    cache.aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache());
                    CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                }
            });
        } else {
            this.forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>(){

                @Override
                public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                    aff.calculate(fut.initialVersion(), fut.events(), fut.events().discoveryCache());
                    CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                }
            });
        }
        Object object = this.mux;
        synchronized (object) {
            this.waitInfo = null;
        }
        return true;
    }

    public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut, final boolean newAff) throws IgniteCheckedException {
        final ArrayList futs = new ArrayList();
        final AffinityTopologyVersion topVer = fut.initialVersion();
        this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

            @Override
            public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                GridAffinityAssignmentCache aff;
                CacheGroupHolder grpHolder = (CacheGroupHolder)CacheAffinitySharedManager.this.grpHolders.get(desc.groupId());
                if (grpHolder != null) {
                    return;
                }
                int grpId = desc.groupId();
                CacheGroupContext grp = CacheAffinitySharedManager.this.cctx.cache().cacheGroup(grpId);
                if (grp == null) {
                    CacheAffinitySharedManager.this.cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>(){

                        @Override
                        public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
                            CacheAffinitySharedManager.this.processAffinityAssignmentResponse(nodeId, res);
                        }
                    });
                    grpHolder = CacheGroupHolder2.create(CacheAffinitySharedManager.this.cctx, desc, topVer, null);
                    aff = grpHolder.affinity();
                    if (newAff) {
                        if (!aff.lastVersion().equals(topVer)) {
                            CacheAffinitySharedManager.this.calculateAndInit(fut.events(), aff, topVer);
                        }
                        grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false);
                    } else {
                        List<GridDhtPartitionsExchangeFuture> exchFuts = CacheAffinitySharedManager.this.cctx.exchange().exchangeFutures();
                        int idx = exchFuts.indexOf(fut);
                        assert (idx >= 0 && idx < exchFuts.size() - 1) : "Invalid exchange futures state [cur=" + idx + ", total=" + exchFuts.size() + ']';
                        final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
                        assert (prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0) : prev;
                        if (CacheAffinitySharedManager.this.log.isDebugEnabled()) {
                            CacheAffinitySharedManager.this.log.debug("Need initialize affinity on coordinator [cacheGrp=" + desc.cacheOrGroupName() + "prevAff=" + prev.topologyVersion() + ']');
                        }
                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(CacheAffinitySharedManager.this.cctx, desc.groupId(), prev.topologyVersion(), prev.events().discoveryCache());
                        fetchFut.init(false);
                        final GridFutureAdapter affFut = new GridFutureAdapter();
                        fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>(){

                            @Override
                            public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) throws IgniteCheckedException {
                                CacheAffinitySharedManager.this.fetchAffinity(prev.topologyVersion(), null, prev.events(), prev.events().discoveryCache(), aff, (GridDhtAssignmentFetchFuture)fetchFut);
                                aff.calculate(topVer, fut.events(), fut.events().discoveryCache());
                                affFut.onDone(topVer);
                                CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
                            }
                        });
                        futs.add(affFut);
                    }
                } else {
                    grpHolder = new CacheGroupHolder1(grp, null);
                    if (newAff) {
                        aff = grpHolder.affinity();
                        if (!aff.lastVersion().equals(topVer)) {
                            CacheAffinitySharedManager.this.calculateAndInit(fut.events(), aff, topVer);
                        }
                        grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false);
                    }
                }
                CacheGroupHolder old = CacheAffinitySharedManager.this.grpHolders.put(grpHolder.groupId(), grpHolder);
                assert (old == null) : old;
                CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
            }
        });
        if (!futs.isEmpty()) {
            GridCompoundFuture affFut = new GridCompoundFuture();
            for (IgniteInternalFuture f : futs) {
                affFut.add(f);
            }
            affFut.markInitialized();
            return affFut;
        }
        return null;
    }

    private CacheGroupHolder groupHolder(AffinityTopologyVersion topVer, CacheGroupDescriptor desc) throws IgniteCheckedException {
        CacheGroupHolder cacheGrp = (CacheGroupHolder)this.grpHolders.get(desc.groupId());
        if (cacheGrp != null) {
            return cacheGrp;
        }
        CacheGroupContext grp = this.cctx.cache().cacheGroup(desc.groupId());
        if (grp == null) {
            this.cctx.io().addCacheGroupHandler(desc.groupId(), GridDhtAffinityAssignmentResponse.class, (IgniteBiInClosure<UUID, ? extends GridCacheMessage>)new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>(){

                @Override
                public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
                    CacheAffinitySharedManager.this.processAffinityAssignmentResponse(nodeId, res);
                }
            });
            cacheGrp = CacheGroupHolder2.create(this.cctx, desc, topVer, null);
        } else {
            cacheGrp = new CacheGroupHolder1(grp, null);
        }
        CacheGroupHolder old = this.grpHolders.put(desc.groupId(), cacheGrp);
        assert (old == null) : old;
        return cacheGrp;
    }

    @Nullable
    private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
        final ExchangeDiscoveryEvents evts = fut.context().events();
        final HashMap<Object, List<List<ClusterNode>>> affCache = new HashMap<Object, List<List<ClusterNode>>>();
        if (!crd) {
            for (CacheGroupContext grp : this.cctx.cache().cacheGroups()) {
                if (grp.isLocal()) continue;
                boolean latePrimary = grp.rebalanceEnabled();
                this.initAffinityOnNodeJoin(evts, evts.nodeJoined(grp.receivedFrom()), grp.affinity(), null, latePrimary, affCache);
                this.cctx.exchange().exchangerUpdateHeartbeat();
            }
            return null;
        }
        final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.lastServerEventVersion());
        this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

            @Override
            public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                CacheGroupHolder cache = CacheAffinitySharedManager.this.groupHolder(evts.topologyVersion(), desc);
                boolean latePrimary = cache.rebalanceEnabled;
                boolean grpAdded = evts.nodeJoined(desc.receivedFrom());
                CacheAffinitySharedManager.this.initAffinityOnNodeJoin(evts, grpAdded, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
                if (grpAdded) {
                    AffinityAssignment aff = cache.aff.cachedAffinity(cache.aff.lastVersion());
                    assert (evts.topologyVersion().equals(aff.topologyVersion())) : "Unexpected version [grp=" + CacheGroupHolder.access$1400(cache).cacheOrGroupName() + ", evts=" + evts.topologyVersion() + ", aff=" + CacheGroupHolder.access$1400(cache).lastVersion() + ']';
                    Map map = CacheAffinitySharedManager.this.affinityFullMap(aff);
                    for (GridDhtPartitionMap map0 : map.values()) {
                        cache.topology(fut.context().events().discoveryCache()).update(fut.exchangeId(), map0, true);
                    }
                }
                CacheAffinitySharedManager.this.cctx.exchange().exchangerUpdateHeartbeat();
            }
        });
        return waitRebalanceInfo;
    }

    private Map<UUID, GridDhtPartitionMap> affinityFullMap(AffinityAssignment aff) {
        HashMap<UUID, GridDhtPartitionMap> map = new HashMap<UUID, GridDhtPartitionMap>();
        for (int p = 0; p < aff.assignment().size(); ++p) {
            HashSet<UUID> ids = aff.getIds(p);
            for (UUID nodeId : ids) {
                GridDhtPartitionMap partMap = (GridDhtPartitionMap)map.get(nodeId);
                if (partMap == null) {
                    partMap = new GridDhtPartitionMap(nodeId, 1L, aff.topologyVersion(), new GridPartitionStateMap(), false);
                    map.put(nodeId, partMap);
                }
                partMap.put(p, GridDhtPartitionState.OWNING);
            }
        }
        return map;
    }

    private void initAffinityOnNodeJoin(ExchangeDiscoveryEvents evts, boolean addedOnExchnage, GridAffinityAssignmentCache aff, WaitRebalanceInfo rebalanceInfo, boolean latePrimary, Map<Object, List<List<ClusterNode>>> affCache) {
        if (addedOnExchnage) {
            if (!aff.lastVersion().equals(evts.topologyVersion())) {
                this.calculateAndInit(evts, aff, evts.topologyVersion());
            }
            return;
        }
        AffinityTopologyVersion affTopVer = aff.lastVersion();
        assert (affTopVer.topologyVersion() > 0L) : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + ", topVer=" + affTopVer + ", node=" + this.cctx.localNodeId() + ']';
        List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
        assert (aff.idealAssignment() != null) : "Previous assignment is not available.";
        List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts, evts.discoveryCache());
        List<List<ClusterNode>> newAssignment = null;
        if (latePrimary) {
            for (int p = 0; p < idealAssignment.size(); ++p) {
                ClusterNode newPrimary;
                List<ClusterNode> newNodes = idealAssignment.get(p);
                List<ClusterNode> curNodes = curAff.get(p);
                ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) : null;
                ClusterNode clusterNode = newPrimary = !newNodes.isEmpty() ? newNodes.get(0) : null;
                if (curPrimary == null || newPrimary == null || curPrimary.equals(newPrimary)) continue;
                assert (this.cctx.discovery().node(evts.topologyVersion(), curPrimary.id()) != null) : curPrimary;
                List<ClusterNode> nodes0 = this.latePrimaryAssignment(aff, p, curPrimary, newNodes, rebalanceInfo);
                if (newAssignment == null) {
                    newAssignment = new ArrayList<List<ClusterNode>>(idealAssignment);
                }
                newAssignment.set(p, nodes0);
            }
        }
        if (newAssignment == null) {
            newAssignment = idealAssignment;
        }
        aff.initialize(evts.topologyVersion(), this.cachedAssignment(aff, newAssignment, affCache));
    }

    private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff, List<List<ClusterNode>> assign, Map<Object, List<List<ClusterNode>>> affCache) {
        List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey());
        if (assign0 != null && assign0.equals(assign)) {
            assign = assign0;
        } else {
            affCache.put(aff.similarAffinityKey(), assign);
        }
        return assign;
    }

    private List<ClusterNode> latePrimaryAssignment(GridAffinityAssignmentCache aff, int part, ClusterNode curPrimary, List<ClusterNode> newNodes, WaitRebalanceInfo rebalance) {
        assert (curPrimary != null);
        assert (!F.isEmpty(newNodes));
        assert (!curPrimary.equals(newNodes.get(0)));
        ArrayList<ClusterNode> nodes0 = new ArrayList<ClusterNode>(newNodes.size() + 1);
        nodes0.add(curPrimary);
        for (int i = 0; i < newNodes.size(); ++i) {
            ClusterNode node = newNodes.get(i);
            if (node.equals(curPrimary)) continue;
            nodes0.add(node);
        }
        if (rebalance != null) {
            rebalance.add(aff.groupId(), part, newNodes.get(0).id(), newNodes);
        }
        return nodes0;
    }

    public IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> initAffinityOnNodeLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
        assert (!fut.context().mergeExchanges());
        IgniteInternalFuture<?> initFut = this.initCoordinatorCaches(fut, false);
        if (initFut != null && !initFut.isDone()) {
            final GridFutureAdapter<Map<Integer, Map<Integer, List<UUID>>>> resFut = new GridFutureAdapter<Map<Integer, Map<Integer, List<UUID>>>>();
            initFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>(){

                @Override
                public void apply(IgniteInternalFuture<?> initFut) {
                    try {
                        resFut.onDone(CacheAffinitySharedManager.this.initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false));
                    }
                    catch (IgniteCheckedException e) {
                        resFut.onDone(e);
                    }
                }
            });
            return resFut;
        }
        return new GridFinishedFuture<Map<Integer, Map<Integer, List<UUID>>>>(this.initAffinityBasedOnPartitionsAvailability(fut.initialVersion(), fut, NODE_TO_ID, false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> Map<Integer, Map<Integer, List<T>>> initAffinityBasedOnPartitionsAvailability(final AffinityTopologyVersion topVer, final GridDhtPartitionsExchangeFuture fut, final IgniteClosure<ClusterNode, T> c, final boolean initAff) throws IgniteCheckedException {
        final boolean enforcedCentralizedAssignment = DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(fut.firstEvent());
        final WaitRebalanceInfo waitRebalanceInfo = enforcedCentralizedAssignment ? new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) : new WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
        final List<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes();
        final HashMap<Integer, Map<Integer, List<T>>> assignment = new HashMap<Integer, Map<Integer, List<T>>>();
        this.forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>(){

            @Override
            public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException {
                CacheGroupHolder grpHolder = CacheAffinitySharedManager.this.groupHolder(topVer, desc);
                if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()) && !enforcedCentralizedAssignment) {
                    return;
                }
                AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion();
                assert (affTopVer.topologyVersion() > 0L && !affTopVer.equals(topVer) || enforcedCentralizedAssignment) : "Invalid affinity version [last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']';
                List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer);
                List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment();
                assert (newAssignment != null);
                ArrayList<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<List<ClusterNode>>(newAssignment) : null;
                GridDhtPartitionTopology top = grpHolder.topology(fut.context().events().discoveryCache());
                HashMap cacheAssignment = null;
                for (int p = 0; p < newAssignment.size(); ++p) {
                    List<ClusterNode> newNodes = newAssignment.get(p);
                    List<ClusterNode> curNodes = curAssignment.get(p);
                    assert (aliveNodes.containsAll(newNodes)) : "Invalid new assignment [grp=" + CacheGroupHolder.access$1400(grpHolder).cacheOrGroupName() + ", nodes=" + newNodes + ", topVer=" + fut.context().events().discoveryCache().version() + ", evts=" + fut.context().events().events() + "]";
                    ClusterNode curPrimary = !curNodes.isEmpty() ? curNodes.get(0) : null;
                    ClusterNode newPrimary = !newNodes.isEmpty() ? newNodes.get(0) : null;
                    List<ClusterNode> newNodes0 = null;
                    assert (newPrimary == null || aliveNodes.contains(newPrimary)) : "Invalid new primary [grp=" + desc.cacheOrGroupName() + ", node=" + newPrimary + ", topVer=" + topVer + ']';
                    List<ClusterNode> owners = top.owners(p);
                    if (!owners.isEmpty() && !owners.contains(curPrimary)) {
                        curPrimary = owners.get(0);
                    }
                    if (curPrimary != null && newPrimary == null) {
                        newNodes0 = new ArrayList<ClusterNode>(curNodes.size());
                        for (ClusterNode node : curNodes) {
                            if (!aliveNodes.contains(node)) continue;
                            newNodes0.add(node);
                        }
                    } else if (curPrimary != null && !curPrimary.equals(newPrimary)) {
                        GridDhtPartitionState state = top.partitionState(newPrimary.id(), p);
                        if (aliveNodes.contains(curPrimary)) {
                            if (state != GridDhtPartitionState.OWNING) {
                                newNodes0 = CacheAffinitySharedManager.this.latePrimaryAssignment(grpHolder.affinity(), p, curPrimary, newNodes, waitRebalanceInfo);
                            }
                        } else if (state != GridDhtPartitionState.OWNING) {
                            for (int i = 1; i < curNodes.size(); ++i) {
                                ClusterNode curNode = curNodes.get(i);
                                if (top.partitionState(curNode.id(), p) != GridDhtPartitionState.OWNING || !aliveNodes.contains(curNode)) continue;
                                newNodes0 = CacheAffinitySharedManager.this.latePrimaryAssignment(grpHolder.affinity(), p, curNode, newNodes, waitRebalanceInfo);
                                break;
                            }
                            if (newNodes0 == null) {
                                for (ClusterNode owner : owners) {
                                    if (!aliveNodes.contains(owner)) continue;
                                    newNodes0 = CacheAffinitySharedManager.this.latePrimaryAssignment(grpHolder.affinity(), p, owner, newNodes, waitRebalanceInfo);
                                    break;
                                }
                            }
                        }
                    }
                    if (newNodes0 == null) continue;
                    assert (aliveNodes.containsAll(newNodes0)) : "Invalid late assignment [grp=" + CacheGroupHolder.access$1400(grpHolder).cacheOrGroupName() + ", nodes=" + newNodes + ", topVer=" + fut.context().events().discoveryCache().version() + ", evts=" + fut.context().events().events() + "]";
                    if (newAssignment0 != null) {
                        newAssignment0.set(p, newNodes0);
                    }
                    if (cacheAssignment == null) {
                        cacheAssignment = new HashMap();
                    }
                    ArrayList n = new ArrayList(newNodes0.size());
                    for (int i = 0; i < newNodes0.size(); ++i) {
                        n.add(c.apply(newNodes0.get(i)));
                    }
                    cacheAssignment.put(p, n);
                }
                if (cacheAssignment != null) {
                    assignment.put(grpHolder.groupId(), cacheAssignment);
                }
                if (initAff) {
                    grpHolder.affinity().initialize(topVer, newAssignment0);
                }
            }
        });
        Object object = this.mux;
        synchronized (object) {
            WaitRebalanceInfo info = this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : null;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Computed new affinity after node left [topVer=" + topVer + ", waitGrps=" + (info != null ? this.groupNames(info.waitGrps.keySet()) : null) + ']');
            }
        }
        return assignment;
    }

    public Map<Integer, CacheGroupDescriptor> cacheGroups() {
        return this.cachesRegistry.allGroups();
    }

    public Map<Integer, DynamicCacheDescriptor> caches() {
        return this.cachesRegistry.allCaches();
    }

    @Nullable
    public GridAffinityAssignmentCache groupAffinity(int grpId) {
        CacheGroupHolder grpHolder = (CacheGroupHolder)this.grpHolders.get(grpId);
        return grpHolder != null ? grpHolder.affinity() : null;
    }

    public void dumpDebugInfo() {
        if (!this.pendingAssignmentFetchFuts.isEmpty()) {
            U.warn(this.log, "Pending assignment fetch futures:");
            for (GridDhtAssignmentFetchFuture fut : this.pendingAssignmentFetchFuts.values()) {
                U.warn(this.log, ">>> " + fut);
            }
        }
    }

    private static List<UUID> toIds0(List<ClusterNode> nodes) {
        ArrayList<UUID> partIds = new ArrayList<UUID>(nodes.size());
        for (int i = 0; i < nodes.size(); ++i) {
            partIds.add(nodes.get(i).id());
        }
        return partIds;
    }

    private List<ClusterNode> toNodes(AffinityTopologyVersion topVer, List<UUID> ids) {
        ArrayList<ClusterNode> nodes = new ArrayList<ClusterNode>(ids.size());
        for (int i = 0; i < ids.size(); ++i) {
            UUID id = ids.get(i);
            ClusterNode node = this.cctx.discovery().node(topVer, id);
            assert (node != null) : "Failed to get node [id=" + id + ", topVer=" + topVer + ", locNode=" + this.cctx.localNode() + ", allNodes=" + this.cctx.discovery().nodes(topVer) + ']';
            nodes.add(node);
        }
        return nodes;
    }

    class WaitRebalanceInfo {
        private final AffinityTopologyVersion topVer;
        private Map<Integer, Map<Integer, UUID>> waitGrps;
        private Map<Integer, Map<Integer, List<ClusterNode>>> assignments;
        private Map<Integer, IgniteUuid> deploymentIds;

        WaitRebalanceInfo(AffinityTopologyVersion topVer) {
            this.topVer = topVer;
        }

        boolean empty() {
            if (this.waitGrps != null) {
                assert (!this.waitGrps.isEmpty());
                assert (this.waitGrps.size() == this.assignments.size());
                return false;
            }
            return true;
        }

        void add(Integer grpId, Integer part, UUID waitNode, List<ClusterNode> assignment) {
            Map<Integer, UUID> cacheWaitParts;
            assert (!F.isEmpty(assignment)) : assignment;
            if (this.waitGrps == null) {
                this.waitGrps = new HashMap<Integer, Map<Integer, UUID>>();
                this.assignments = new HashMap<Integer, Map<Integer, List<ClusterNode>>>();
                this.deploymentIds = new HashMap<Integer, IgniteUuid>();
            }
            if ((cacheWaitParts = this.waitGrps.get(grpId)) == null) {
                cacheWaitParts = new HashMap<Integer, UUID>();
                this.waitGrps.put(grpId, cacheWaitParts);
                this.deploymentIds.put(grpId, CacheAffinitySharedManager.this.cachesRegistry.group(grpId).deploymentId());
            }
            cacheWaitParts.put(part, waitNode);
            Map<Integer, List<ClusterNode>> cacheAssignment = this.assignments.get(grpId);
            if (cacheAssignment == null) {
                cacheAssignment = new HashMap<Integer, List<ClusterNode>>();
                this.assignments.put(grpId, cacheAssignment);
            }
            cacheAssignment.put(part, assignment);
        }

        public String toString() {
            return "WaitRebalanceInfo [topVer=" + this.topVer + ", grps=" + (this.waitGrps != null ? this.waitGrps.keySet() : null) + ']';
        }
    }

    private static class CacheGroupHolder2
    extends CacheGroupHolder {
        private final GridCacheSharedContext cctx;

        static CacheGroupHolder2 create(GridCacheSharedContext cctx, CacheGroupDescriptor grpDesc, AffinityTopologyVersion topVer, @Nullable GridAffinityAssignmentCache initAff) throws IgniteCheckedException {
            assert (grpDesc != null);
            assert (!cctx.kernalContext().clientNode());
            CacheConfiguration<?, ?> ccfg = grpDesc.config();
            assert (ccfg != null) : grpDesc;
            assert (ccfg.getCacheMode() != CacheMode.LOCAL) : ccfg.getName();
            assert (!cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(), topVer).contains(cctx.localNode())) : grpDesc.cacheOrGroupName();
            AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
            cctx.kernalContext().resource().injectGeneric(affFunc);
            cctx.kernalContext().resource().injectCacheName(affFunc, ccfg.getName());
            U.startLifecycleAware(F.asList(affFunc));
            GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(), grpDesc.cacheOrGroupName(), grpDesc.groupId(), affFunc, ccfg.getNodeFilter(), ccfg.getBackups(), ccfg.getCacheMode() == CacheMode.LOCAL, grpDesc.persistenceEnabled());
            return new CacheGroupHolder2(ccfg.getRebalanceMode() != CacheRebalanceMode.NONE, cctx, aff, initAff);
        }

        CacheGroupHolder2(boolean rebalanceEnabled, GridCacheSharedContext cctx, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
            super(rebalanceEnabled, aff, initAff);
            this.cctx = cctx;
        }

        @Override
        public boolean client() {
            return true;
        }

        @Override
        public GridDhtPartitionTopology topology(DiscoCache discoCache) {
            return this.cctx.exchange().clientTopology(this.groupId(), discoCache);
        }
    }

    private class CacheGroupHolder1
    extends CacheGroupHolder {
        private final CacheGroupContext grp;

        CacheGroupHolder1(@Nullable CacheGroupContext grp, GridAffinityAssignmentCache initAff) {
            super(grp.rebalanceEnabled(), grp.affinity(), initAff);
            assert (!grp.isLocal()) : grp;
            this.grp = grp;
        }

        @Override
        public boolean client() {
            return false;
        }

        @Override
        public GridDhtPartitionTopology topology(DiscoCache discoCache) {
            return this.grp.topology();
        }
    }

    static abstract class CacheGroupHolder {
        private final GridAffinityAssignmentCache aff;
        private final boolean rebalanceEnabled;

        CacheGroupHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
            this.aff = aff;
            if (initAff != null) {
                aff.init(initAff);
            }
            this.rebalanceEnabled = rebalanceEnabled;
        }

        abstract boolean client();

        int groupId() {
            return this.aff.groupId();
        }

        int partitions() {
            return this.aff.partitions();
        }

        abstract GridDhtPartitionTopology topology(DiscoCache var1);

        GridAffinityAssignmentCache affinity() {
            return this.aff;
        }
    }
}

