/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractStickyAssignor
extends AbstractPartitionAssignor {
    private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class);
    public static final int DEFAULT_GENERATION = -1;
    public int maxGeneration = -1;
    private PartitionMovements partitionMovements;
    protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<TopicPartition, String>();

    protected abstract MemberData memberData(ConsumerPartitionAssignor.Subscription var1);

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, ConsumerPartitionAssignor.Subscription> subscriptions) {
        HashMap<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<String, List<TopicPartition>>();
        HashSet<TopicPartition> partitionsWithMultiplePreviousOwners = new HashSet<TopicPartition>();
        if (this.allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the optimized assignment algorithm");
            this.partitionsTransferringOwnership = new HashMap<TopicPartition, String>();
            return this.constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
        }
        log.debug("Detected that not all consumers were subscribed to same set of topics, falling back to the general case assignment algorithm");
        this.partitionsTransferringOwnership = null;
        return this.generalAssign(partitionsPerTopic, subscriptions, consumerToOwnedPartitions);
    }

    private boolean allSubscriptionsEqual(Set<String> allTopics, Map<String, ConsumerPartitionAssignor.Subscription> subscriptions, Map<String, List<TopicPartition>> consumerToOwnedPartitions, Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
        HashSet<String> membersOfCurrentHighestGeneration = new HashSet<String>();
        boolean isAllSubscriptionsEqual = true;
        HashSet<String> subscribedTopics = new HashSet<String>();
        HashMap<TopicPartition, String> allPreviousPartitionsToOwner = new HashMap<TopicPartition, String>();
        for (Map.Entry<String, ConsumerPartitionAssignor.Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumer = subscriptionEntry.getKey();
            ConsumerPartitionAssignor.Subscription subscription = subscriptionEntry.getValue();
            if (subscribedTopics.isEmpty()) {
                subscribedTopics.addAll(subscription.topics());
            } else if (isAllSubscriptionsEqual && (subscription.topics().size() != subscribedTopics.size() || !subscribedTopics.containsAll(subscription.topics()))) {
                isAllSubscriptionsEqual = false;
            }
            MemberData memberData = this.memberData(subscription);
            ArrayList<TopicPartition> ownedPartitions = new ArrayList<TopicPartition>();
            consumerToOwnedPartitions.put(consumer, ownedPartitions);
            if ((!memberData.generation.isPresent() || memberData.generation.get() < this.maxGeneration) && (memberData.generation.isPresent() || this.maxGeneration != -1)) continue;
            if (memberData.generation.isPresent() && memberData.generation.get() > this.maxGeneration) {
                allPreviousPartitionsToOwner.clear();
                partitionsWithMultiplePreviousOwners.clear();
                for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
                    consumerToOwnedPartitions.get(droppedOutConsumer).clear();
                }
                membersOfCurrentHighestGeneration.clear();
                this.maxGeneration = memberData.generation.get();
            }
            membersOfCurrentHighestGeneration.add(consumer);
            for (TopicPartition tp : memberData.partitions) {
                if (!allTopics.contains(tp.topic())) continue;
                if (!allPreviousPartitionsToOwner.containsKey(tp)) {
                    allPreviousPartitionsToOwner.put(tp, consumer);
                    ownedPartitions.add(tp);
                    continue;
                }
                String otherConsumer = (String)allPreviousPartitionsToOwner.get(tp);
                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the same generation {}, this will be invalidated and removed from their previous assignment.", consumer, otherConsumer, tp, this.maxGeneration);
                consumerToOwnedPartitions.get(otherConsumer).remove(tp);
                partitionsWithMultiplePreviousOwners.add(tp);
            }
        }
        return isAllSubscriptionsEqual;
    }

    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions, Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
        if (log.isDebugEnabled()) {
            log.debug("Performing constrained assign with partitionsPerTopic: {}, consumerToOwnedPartitions: {}.", (Object)partitionsPerTopic, (Object)consumerToOwnedPartitions);
        }
        HashSet<TopicPartition> allRevokedPartitions = new HashSet<TopicPartition>();
        LinkedList<String> unfilledMembersWithUnderMinQuotaPartitions = new LinkedList<String>();
        LinkedList<String> unfilledMembersWithExactlyMinQuotaPartitions = new LinkedList<String>();
        int numberOfConsumers = consumerToOwnedPartitions.size();
        int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
        int minQuota = (int)Math.floor((double)totalPartitionsCount / (double)numberOfConsumers);
        int maxQuota = (int)Math.ceil((double)totalPartitionsCount / (double)numberOfConsumers);
        int expectedNumMembersWithOverMinQuotaPartitions = totalPartitionsCount % numberOfConsumers;
        int currentNumMembersWithOverMinQuotaPartitions = 0;
        HashMap<String, List<TopicPartition>> assignment = new HashMap<String, List<TopicPartition>>(consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList(maxQuota))));
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
            String consumer = consumerEntry.getKey();
            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
            List consumerAssignment = (List)assignment.get(consumer);
            for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) {
                if (!ownedPartitions.contains(doublyClaimedPartition)) continue;
                log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple consumers already in the same generation. Removing it from the ownedPartitions", (Object)doublyClaimedPartition, (Object)consumer);
                ownedPartitions.remove(doublyClaimedPartition);
            }
            if (ownedPartitions.size() < minQuota) {
                if (ownedPartitions.size() > 0) {
                    consumerAssignment.addAll(ownedPartitions);
                    assignedPartitions.addAll(ownedPartitions);
                }
                unfilledMembersWithUnderMinQuotaPartitions.add(consumer);
                continue;
            }
            if (ownedPartitions.size() >= maxQuota && currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions) {
                if (++currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) {
                    unfilledMembersWithExactlyMinQuotaPartitions.clear();
                }
                List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
                consumerAssignment.addAll(maxQuotaPartitions);
                assignedPartitions.addAll(maxQuotaPartitions);
                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
                continue;
            }
            List<TopicPartition> minQuotaPartitions = ownedPartitions.subList(0, minQuota);
            consumerAssignment.addAll(minQuotaPartitions);
            assignedPartitions.addAll(minQuotaPartitions);
            allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
            if (currentNumMembersWithOverMinQuotaPartitions >= expectedNumMembersWithOverMinQuotaPartitions) continue;
            unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
        }
        List<TopicPartition> unassignedPartitions = this.getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions);
        if (log.isDebugEnabled()) {
            log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, current assignment: {}", unfilledMembersWithUnderMinQuotaPartitions, unassignedPartitions, assignment);
        }
        Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
        Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
        Iterator unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
        for (TopicPartition unassignedPartition : unassignedPartitions) {
            int currentAssignedCount;
            String consumer;
            if (unfilledConsumerIter.hasNext()) {
                consumer = (String)unfilledConsumerIter.next();
            } else {
                if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() && unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
                    int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition);
                    log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", (Object)unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));
                    throw new IllegalStateException("No more unfilled consumers to be assigned.");
                }
                if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
                    consumer = (String)unfilledMembersWithExactlyMinQuotaPartitions.poll();
                } else {
                    unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
                    consumer = (String)unfilledConsumerIter.next();
                }
            }
            List consumerAssignment = (List)assignment.get(consumer);
            consumerAssignment.add(unassignedPartition);
            if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) {
                this.partitionsTransferringOwnership.put(unassignedPartition, consumer);
            }
            if ((currentAssignedCount = consumerAssignment.size()) == minQuota) {
                unfilledConsumerIter.remove();
                unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
                continue;
            }
            if (currentAssignedCount != maxQuota || ++currentNumMembersWithOverMinQuotaPartitions != expectedNumMembersWithOverMinQuotaPartitions || unassignedPartitions.indexOf(unassignedPartition) == unassignedPartitions.size() - 1) continue;
            log.error("Filled the last member up to maxQuota but still had partitions remaining to assign, will continue but this indicates a bug in the assignment.");
        }
        if (!unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
            if (currentNumMembersWithOverMinQuotaPartitions != expectedNumMembersWithOverMinQuotaPartitions) {
                log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}", currentNumMembersWithOverMinQuotaPartitions, expectedNumMembersWithOverMinQuotaPartitions, unfilledMembersWithUnderMinQuotaPartitions);
                throw new IllegalStateException("We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned");
            }
            for (String unfilledMember : unfilledMembersWithUnderMinQuotaPartitions) {
                int assignedPartitionsCount = ((List)assignment.get(unfilledMember)).size();
                if (assignedPartitionsCount != minQuota) {
                    log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembersWithUnderMinQuotaPartitions);
                    throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, and no more partitions to be assigned", unfilledMember));
                }
                log.trace("skip over this unfilled member: [{}] because we've reached the expected number of members with more than the minQuota partitions, and this member already have minQuota partitions", (Object)unfilledMember);
            }
        }
        log.info("Final assignment of partitions to consumers: \n{}", (Object)assignment);
        return assignment;
    }

    private List<TopicPartition> getAllTopicPartitions(Map<String, Integer> partitionsPerTopic, List<String> sortedAllTopics, int totalPartitionsCount) {
        ArrayList<TopicPartition> allPartitions = new ArrayList<TopicPartition>(totalPartitionsCount);
        for (String topic : sortedAllTopics) {
            int partitionCount = partitionsPerTopic.get(topic);
            for (int i = 0; i < partitionCount; ++i) {
                allPartitions.add(new TopicPartition(topic, i));
            }
        }
        return allPartitions;
    }

    private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic, Map<String, ConsumerPartitionAssignor.Subscription> subscriptions, Map<String, List<TopicPartition>> currentAssignment) {
        if (log.isDebugEnabled()) {
            log.debug("performing general assign. partitionsPerTopic: {}, subscriptions: {}, currentAssignment: {}", partitionsPerTopic, subscriptions, currentAssignment);
        }
        HashMap<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<TopicPartition, ConsumerGenerationPair>();
        this.partitionMovements = new PartitionMovements();
        this.prepopulateCurrentAssignments(subscriptions, prevAssignment);
        HashMap<String, List<String>> topic2AllPotentialConsumers = new HashMap<String, List<String>>(partitionsPerTopic.keySet().size());
        HashMap<String, List<String>> consumer2AllPotentialTopics = new HashMap<String, List<String>>(subscriptions.keySet().size());
        partitionsPerTopic.keySet().stream().forEach(topicName -> {
            List cfr_ignored_0 = topic2AllPotentialConsumers.put((String)topicName, new ArrayList());
        });
        for (Map.Entry<String, ConsumerPartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            String string = entry.getKey();
            ArrayList subscribedTopics = new ArrayList(entry.getValue().topics().size());
            consumer2AllPotentialTopics.put(string, subscribedTopics);
            entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> {
                subscribedTopics.add(topic);
                ((List)topic2AllPotentialConsumers.get(topic)).add(consumerId);
            });
            if (currentAssignment.containsKey(string)) continue;
            currentAssignment.put(string, new ArrayList());
        }
        HashMap<TopicPartition, String> currentPartitionConsumer = new HashMap<TopicPartition, String>();
        for (Map.Entry<String, List<TopicPartition>> entry : currentAssignment.entrySet()) {
            for (TopicPartition topicPartition : entry.getValue()) {
                currentPartitionConsumer.put(topicPartition, entry.getKey());
            }
        }
        int n = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
        ArrayList<String> arrayList = new ArrayList<String>(topic2AllPotentialConsumers.keySet());
        Collections.sort(arrayList, new TopicComparator(topic2AllPotentialConsumers));
        List<TopicPartition> sortedAllPartitions = this.getAllTopicPartitions(partitionsPerTopic, arrayList, n);
        ArrayList<TopicPartition> assignedPartitions = new ArrayList<TopicPartition>();
        boolean revocationRequired = false;
        Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, List<TopicPartition>> entry = it.next();
            ConsumerPartitionAssignor.Subscription consumerSubscription = subscriptions.get(entry.getKey());
            if (consumerSubscription == null) {
                for (TopicPartition topicPartition : entry.getValue()) {
                    currentPartitionConsumer.remove(topicPartition);
                }
                it.remove();
                continue;
            }
            Iterator<TopicPartition> partitionIter = entry.getValue().iterator();
            while (partitionIter.hasNext()) {
                TopicPartition partition = partitionIter.next();
                if (!topic2AllPotentialConsumers.containsKey(partition.topic())) {
                    partitionIter.remove();
                    currentPartitionConsumer.remove(partition);
                    continue;
                }
                if (!consumerSubscription.topics().contains(partition.topic())) {
                    partitionIter.remove();
                    revocationRequired = true;
                    continue;
                }
                assignedPartitions.add(partition);
            }
        }
        List<TopicPartition> unassignedPartitions = this.getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
        if (log.isDebugEnabled()) {
            log.debug("unassigned Partitions: {}", (Object)unassignedPartitions);
        }
        TreeSet<String> sortedCurrentSubscriptions = new TreeSet<String>(new SubscriptionComparator(currentAssignment));
        sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
        this.balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, partitionsPerTopic, n);
        log.info("Final assignment of partitions to consumers: \n{}", (Object)currentAssignment);
        return currentAssignment;
    }

    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedAllPartitions, List<TopicPartition> sortedAssignedPartitions, Map<String, List<String>> topic2AllPotentialConsumers) {
        if (sortedAssignedPartitions.isEmpty()) {
            return sortedAllPartitions;
        }
        ArrayList<TopicPartition> unassignedPartitions = new ArrayList<TopicPartition>();
        Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers));
        boolean shouldAddDirectly = false;
        Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
        TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next();
        for (TopicPartition topicPartition : sortedAllPartitions) {
            if (shouldAddDirectly || !nextAssignedPartition.equals(topicPartition)) {
                unassignedPartitions.add(topicPartition);
                continue;
            }
            if (sortedAssignedPartitionsIter.hasNext()) {
                nextAssignedPartition = sortedAssignedPartitionsIter.next();
                continue;
            }
            shouldAddDirectly = true;
        }
        return unassignedPartitions;
    }

    private List<TopicPartition> getUnassignedPartitions(int totalPartitionsCount, Map<String, Integer> partitionsPerTopic, List<TopicPartition> sortedAssignedPartitions) {
        ArrayList<String> sortedAllTopics = new ArrayList<String>(partitionsPerTopic.keySet());
        Collections.sort(sortedAllTopics);
        if (sortedAssignedPartitions.isEmpty()) {
            return this.getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);
        }
        ArrayList<TopicPartition> unassignedPartitions = new ArrayList<TopicPartition>(totalPartitionsCount - sortedAssignedPartitions.size());
        Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
        boolean shouldAddDirectly = false;
        Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
        TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next();
        for (String topic : sortedAllTopics) {
            int partitionCount = partitionsPerTopic.get(topic);
            for (int i = 0; i < partitionCount; ++i) {
                if (shouldAddDirectly || !nextAssignedPartition.topic().equals(topic) || nextAssignedPartition.partition() != i) {
                    unassignedPartitions.add(new TopicPartition(topic, i));
                    continue;
                }
                if (sortedAssignedPartitionsIter.hasNext()) {
                    nextAssignedPartition = sortedAssignedPartitionsIter.next();
                    continue;
                }
                shouldAddDirectly = true;
            }
        }
        return unassignedPartitions;
    }

    private void updatePrevAssignment(Map<TopicPartition, ConsumerGenerationPair> prevAssignment, List<TopicPartition> partitions, String consumer, int generation) {
        for (TopicPartition partition : partitions) {
            if (prevAssignment.containsKey(partition)) {
                if (generation <= prevAssignment.get((Object)partition).generation) continue;
                prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation));
                continue;
            }
            prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation));
        }
    }

    private void prepopulateCurrentAssignments(Map<String, ConsumerPartitionAssignor.Subscription> subscriptions, Map<TopicPartition, ConsumerGenerationPair> prevAssignment) {
        for (Map.Entry<String, ConsumerPartitionAssignor.Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumer = subscriptionEntry.getKey();
            ConsumerPartitionAssignor.Subscription subscription = subscriptionEntry.getValue();
            if (subscription.userData() != null) {
                subscription.userData().rewind();
            }
            MemberData memberData = this.memberData(subscriptionEntry.getValue());
            if (memberData.generation.isPresent() && memberData.generation.get() < this.maxGeneration) {
                this.updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get());
                continue;
            }
            if (memberData.generation.isPresent() || this.maxGeneration <= -1) continue;
            this.updatePrevAssignment(prevAssignment, memberData.partitions, consumer, -1);
        }
    }

    private boolean isBalanced(Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<String, List<String>> allSubscriptions, Map<String, Integer> partitionsPerTopic, int totalPartitionCount) {
        int max;
        int min = currentAssignment.get(sortedCurrentSubscriptions.first()).size();
        if (min >= (max = currentAssignment.get(sortedCurrentSubscriptions.last()).size()) - 1) {
            return true;
        }
        HashMap<TopicPartition, String> allPartitions = new HashMap<TopicPartition, String>();
        Set<Map.Entry<String, List<TopicPartition>>> assignments = currentAssignment.entrySet();
        for (Map.Entry<String, List<TopicPartition>> entry : assignments) {
            List<TopicPartition> topicPartitions = entry.getValue();
            for (TopicPartition topicPartition : topicPartitions) {
                if (allPartitions.containsKey(topicPartition)) {
                    log.error("{} is assigned to more than one consumer.", (Object)topicPartition);
                }
                allPartitions.put(topicPartition, entry.getKey());
            }
        }
        for (String consumer : sortedCurrentSubscriptions) {
            List<String> allSubscribedTopics;
            int maxAssignmentSize;
            List<TopicPartition> consumerPartitions = currentAssignment.get(consumer);
            int consumerPartitionCount = consumerPartitions.size();
            if (consumerPartitionCount == (maxAssignmentSize = this.getMaxAssignmentSize(totalPartitionCount, allSubscribedTopics = allSubscriptions.get(consumer), partitionsPerTopic))) continue;
            for (String topic : allSubscribedTopics) {
                int partitionCount = partitionsPerTopic.get(topic);
                for (int i = 0; i < partitionCount; ++i) {
                    String otherConsumer;
                    int otherConsumerPartitionCount;
                    TopicPartition topicPartition = new TopicPartition(topic, i);
                    if (currentAssignment.get(consumer).contains(topicPartition) || consumerPartitionCount >= (otherConsumerPartitionCount = currentAssignment.get(otherConsumer = (String)allPartitions.get(topicPartition)).size())) continue;
                    log.debug("{} can be moved from consumer {} to consumer {} for a more balanced assignment.", topicPartition, otherConsumer, consumer);
                    return false;
                }
            }
        }
        return true;
    }

    private int getMaxAssignmentSize(int totalPartitionCount, List<String> allSubscribedTopics, Map<String, Integer> partitionsPerTopic) {
        int maxAssignmentSize = allSubscribedTopics.size() == partitionsPerTopic.size() ? totalPartitionCount : allSubscribedTopics.stream().map(topic -> (Integer)partitionsPerTopic.get(topic)).reduce(0, Integer::sum);
        return maxAssignmentSize;
    }

    private int getBalanceScore(Map<String, List<TopicPartition>> assignment) {
        int score = 0;
        HashMap<String, Integer> consumer2AssignmentSize = new HashMap<String, Integer>();
        for (Map.Entry<String, List<TopicPartition>> entry : assignment.entrySet()) {
            consumer2AssignmentSize.put(entry.getKey(), entry.getValue().size());
        }
        Iterator it = consumer2AssignmentSize.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, List<TopicPartition>> entry;
            entry = it.next();
            int consumerAssignmentSize = (Integer)((Object)entry.getValue());
            it.remove();
            for (Map.Entry otherEntry : consumer2AssignmentSize.entrySet()) {
                score += Math.abs(consumerAssignmentSize - (Integer)otherEntry.getValue());
            }
        }
        return score;
    }

    private void assignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions, Map<String, List<TopicPartition>> currentAssignment, Map<String, List<String>> consumer2AllPotentialTopics, Map<TopicPartition, String> currentPartitionConsumer) {
        for (String consumer : sortedCurrentSubscriptions) {
            if (!consumer2AllPotentialTopics.get(consumer).contains(partition.topic())) continue;
            sortedCurrentSubscriptions.remove(consumer);
            currentAssignment.get(consumer).add(partition);
            currentPartitionConsumer.put(partition, consumer);
            sortedCurrentSubscriptions.add(consumer);
            break;
        }
    }

    private boolean canParticipateInReassignment(String topic, Map<String, List<String>> topic2AllPotentialConsumers) {
        return topic2AllPotentialConsumers.get(topic).size() >= 2;
    }

    private boolean canParticipateInReassignment(String consumer, Map<String, List<TopicPartition>> currentAssignment, Map<String, List<String>> consumer2AllPotentialTopics, Map<String, List<String>> topic2AllPotentialConsumers, Map<String, Integer> partitionsPerTopic, int totalPartitionCount) {
        List<String> allSubscribedTopics;
        int maxAssignmentSize;
        List<TopicPartition> currentPartitions = currentAssignment.get(consumer);
        int currentAssignmentSize = currentPartitions.size();
        if (currentAssignmentSize > (maxAssignmentSize = this.getMaxAssignmentSize(totalPartitionCount, allSubscribedTopics = consumer2AllPotentialTopics.get(consumer), partitionsPerTopic))) {
            log.error("The consumer {} is assigned more partitions than the maximum possible.", (Object)consumer);
        }
        if (currentAssignmentSize < maxAssignmentSize) {
            return true;
        }
        for (TopicPartition partition : currentPartitions) {
            if (!this.canParticipateInReassignment(partition.topic(), topic2AllPotentialConsumers)) continue;
            return true;
        }
        return false;
    }

    private void balance(Map<String, List<TopicPartition>> currentAssignment, Map<TopicPartition, ConsumerGenerationPair> prevAssignment, List<TopicPartition> sortedPartitions, List<TopicPartition> unassignedPartitions, TreeSet<String> sortedCurrentSubscriptions, Map<String, List<String>> consumer2AllPotentialTopics, Map<String, List<String>> topic2AllPotentialConsumers, Map<TopicPartition, String> currentPartitionConsumer, boolean revocationRequired, Map<String, Integer> partitionsPerTopic, int totalPartitionCount) {
        boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();
        for (TopicPartition topicPartition : unassignedPartitions) {
            if (topic2AllPotentialConsumers.get(topicPartition.topic()).isEmpty()) continue;
            this.assignPartition(topicPartition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialTopics, currentPartitionConsumer);
        }
        HashSet<TopicPartition> fixedPartitions = new HashSet<TopicPartition>();
        for (String string : topic2AllPotentialConsumers.keySet()) {
            if (this.canParticipateInReassignment(string, topic2AllPotentialConsumers)) continue;
            for (int i = 0; i < partitionsPerTopic.get(string); ++i) {
                fixedPartitions.add(new TopicPartition(string, i));
            }
        }
        sortedPartitions.removeAll(fixedPartitions);
        unassignedPartitions.removeAll(fixedPartitions);
        HashMap<String, List<TopicPartition>> hashMap = new HashMap<String, List<TopicPartition>>();
        for (String consumer : consumer2AllPotentialTopics.keySet()) {
            if (this.canParticipateInReassignment(consumer, currentAssignment, consumer2AllPotentialTopics, topic2AllPotentialConsumers, partitionsPerTopic, totalPartitionCount)) continue;
            sortedCurrentSubscriptions.remove(consumer);
            hashMap.put(consumer, currentAssignment.remove(consumer));
        }
        Map<String, List<TopicPartition>> map = this.deepCopy(currentAssignment);
        HashMap<TopicPartition, String> preBalancePartitionConsumers = new HashMap<TopicPartition, String>(currentPartitionConsumer);
        if (!revocationRequired) {
            this.performReassignments(unassignedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, partitionsPerTopic, totalPartitionCount);
        }
        boolean reassignmentPerformed = this.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, partitionsPerTopic, totalPartitionCount);
        if (!initializing && reassignmentPerformed && this.getBalanceScore(currentAssignment) >= this.getBalanceScore(map)) {
            this.deepCopy(map, currentAssignment);
            currentPartitionConsumer.clear();
            currentPartitionConsumer.putAll(preBalancePartitionConsumers);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            String consumer = (String)entry.getKey();
            currentAssignment.put(consumer, (List<TopicPartition>)entry.getValue());
            sortedCurrentSubscriptions.add(consumer);
        }
        hashMap.clear();
    }

    private boolean performReassignments(List<TopicPartition> reassignablePartitions, Map<String, List<TopicPartition>> currentAssignment, Map<TopicPartition, ConsumerGenerationPair> prevAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<String, List<String>> consumer2AllPotentialTopics, Map<String, List<String>> topic2AllPotentialConsumers, Map<TopicPartition, String> currentPartitionConsumer, Map<String, Integer> partitionsPerTopic, int totalPartitionCount) {
        boolean modified;
        boolean reassignmentPerformed = false;
        do {
            modified = false;
            Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator();
            block1: while (partitionIterator.hasNext() && !this.isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialTopics, partitionsPerTopic, totalPartitionCount)) {
                String consumer;
                TopicPartition partition = partitionIterator.next();
                if (topic2AllPotentialConsumers.get(partition.topic()).size() <= 1) {
                    log.error("Expected more than one potential consumer for partition '{}'", (Object)partition);
                }
                if ((consumer = currentPartitionConsumer.get(partition)) == null) {
                    log.error("Expected partition '{}' to be assigned to a consumer", (Object)partition);
                }
                if (prevAssignment.containsKey(partition) && currentAssignment.get(consumer).size() > currentAssignment.get(prevAssignment.get((Object)partition).consumer).size() + 1) {
                    this.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment.get((Object)partition).consumer);
                    reassignmentPerformed = true;
                    modified = true;
                    continue;
                }
                for (String otherConsumer : topic2AllPotentialConsumers.get(partition.topic())) {
                    if (currentAssignment.get(consumer).size() <= currentAssignment.get(otherConsumer).size() + 1) continue;
                    this.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialTopics);
                    reassignmentPerformed = true;
                    modified = true;
                    continue block1;
                }
            }
        } while (modified);
        return reassignmentPerformed;
    }

    private void reassignPartition(TopicPartition partition, Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<TopicPartition, String> currentPartitionConsumer, Map<String, List<String>> consumer2AllPotentialTopics) {
        String newConsumer = null;
        for (String anotherConsumer : sortedCurrentSubscriptions) {
            if (!consumer2AllPotentialTopics.get(anotherConsumer).contains(partition.topic())) continue;
            newConsumer = anotherConsumer;
            break;
        }
        assert (newConsumer != null);
        this.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, newConsumer);
    }

    private void reassignPartition(TopicPartition partition, Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<TopicPartition, String> currentPartitionConsumer, String newConsumer) {
        String consumer = currentPartitionConsumer.get(partition);
        TopicPartition partitionToBeMoved = this.partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer);
        this.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer);
    }

    private void processPartitionMovement(TopicPartition partition, String newConsumer, Map<String, List<TopicPartition>> currentAssignment, TreeSet<String> sortedCurrentSubscriptions, Map<TopicPartition, String> currentPartitionConsumer) {
        String oldConsumer = currentPartitionConsumer.get(partition);
        sortedCurrentSubscriptions.remove(oldConsumer);
        sortedCurrentSubscriptions.remove(newConsumer);
        this.partitionMovements.movePartition(partition, oldConsumer, newConsumer);
        currentAssignment.get(oldConsumer).remove(partition);
        currentAssignment.get(newConsumer).add(partition);
        currentPartitionConsumer.put(partition, newConsumer);
        sortedCurrentSubscriptions.add(newConsumer);
        sortedCurrentSubscriptions.add(oldConsumer);
    }

    public boolean isSticky() {
        return this.partitionMovements.isSticky();
    }

    private void deepCopy(Map<String, List<TopicPartition>> source, Map<String, List<TopicPartition>> dest) {
        dest.clear();
        for (Map.Entry<String, List<TopicPartition>> entry : source.entrySet()) {
            dest.put(entry.getKey(), new ArrayList(entry.getValue()));
        }
    }

    private Map<String, List<TopicPartition>> deepCopy(Map<String, List<TopicPartition>> assignment) {
        HashMap<String, List<TopicPartition>> copy = new HashMap<String, List<TopicPartition>>();
        this.deepCopy(assignment, copy);
        return copy;
    }

    private static class ConsumerPair {
        private final String srcMemberId;
        private final String dstMemberId;

        ConsumerPair(String srcMemberId, String dstMemberId) {
            this.srcMemberId = srcMemberId;
            this.dstMemberId = dstMemberId;
        }

        public String toString() {
            return this.srcMemberId + "->" + this.dstMemberId;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.srcMemberId == null ? 0 : this.srcMemberId.hashCode());
            result = 31 * result + (this.dstMemberId == null ? 0 : this.dstMemberId.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (!this.getClass().isInstance(obj)) {
                return false;
            }
            ConsumerPair otherPair = (ConsumerPair)obj;
            return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId);
        }

        private boolean in(Set<ConsumerPair> pairs) {
            for (ConsumerPair pair : pairs) {
                if (!this.equals(pair)) continue;
                return true;
            }
            return false;
        }
    }

    private static class PartitionMovements {
        private Map<String, Map<ConsumerPair, Set<TopicPartition>>> partitionMovementsByTopic = new HashMap<String, Map<ConsumerPair, Set<TopicPartition>>>();
        private Map<TopicPartition, ConsumerPair> partitionMovements = new HashMap<TopicPartition, ConsumerPair>();

        private PartitionMovements() {
        }

        private ConsumerPair removeMovementRecordOfPartition(TopicPartition partition) {
            ConsumerPair pair = this.partitionMovements.remove(partition);
            String topic = partition.topic();
            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = this.partitionMovementsByTopic.get(topic);
            partitionMovementsForThisTopic.get(pair).remove(partition);
            if (partitionMovementsForThisTopic.get(pair).isEmpty()) {
                partitionMovementsForThisTopic.remove(pair);
            }
            if (this.partitionMovementsByTopic.get(topic).isEmpty()) {
                this.partitionMovementsByTopic.remove(topic);
            }
            return pair;
        }

        private void addPartitionMovementRecord(TopicPartition partition, ConsumerPair pair) {
            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic;
            this.partitionMovements.put(partition, pair);
            String topic = partition.topic();
            if (!this.partitionMovementsByTopic.containsKey(topic)) {
                this.partitionMovementsByTopic.put(topic, new HashMap());
            }
            if (!(partitionMovementsForThisTopic = this.partitionMovementsByTopic.get(topic)).containsKey(pair)) {
                partitionMovementsForThisTopic.put(pair, new HashSet());
            }
            partitionMovementsForThisTopic.get(pair).add(partition);
        }

        private void movePartition(TopicPartition partition, String oldConsumer, String newConsumer) {
            ConsumerPair pair = new ConsumerPair(oldConsumer, newConsumer);
            if (this.partitionMovements.containsKey(partition)) {
                ConsumerPair existingPair = this.removeMovementRecordOfPartition(partition);
                assert (existingPair.dstMemberId.equals(oldConsumer));
                if (!existingPair.srcMemberId.equals(newConsumer)) {
                    this.addPartitionMovementRecord(partition, new ConsumerPair(existingPair.srcMemberId, newConsumer));
                }
            } else {
                this.addPartitionMovementRecord(partition, pair);
            }
        }

        private TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) {
            ConsumerPair reversePair;
            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic;
            String topic = partition.topic();
            if (!this.partitionMovementsByTopic.containsKey(topic)) {
                return partition;
            }
            if (this.partitionMovements.containsKey(partition)) {
                assert (oldConsumer.equals(this.partitionMovements.get(partition).dstMemberId));
                oldConsumer = this.partitionMovements.get(partition).srcMemberId;
            }
            if (!(partitionMovementsForThisTopic = this.partitionMovementsByTopic.get(topic)).containsKey(reversePair = new ConsumerPair(newConsumer, oldConsumer))) {
                return partition;
            }
            return partitionMovementsForThisTopic.get(reversePair).iterator().next();
        }

        private boolean isLinked(String src, String dst, Set<ConsumerPair> pairs, List<String> currentPath) {
            if (src.equals(dst)) {
                return false;
            }
            if (pairs.isEmpty()) {
                return false;
            }
            if (new ConsumerPair(src, dst).in(pairs)) {
                currentPath.add(src);
                currentPath.add(dst);
                return true;
            }
            for (ConsumerPair pair : pairs) {
                if (!pair.srcMemberId.equals(src)) continue;
                HashSet<ConsumerPair> reducedSet = new HashSet<ConsumerPair>(pairs);
                reducedSet.remove(pair);
                currentPath.add(pair.srcMemberId);
                return this.isLinked(pair.dstMemberId, dst, reducedSet, currentPath);
            }
            return false;
        }

        private boolean in(List<String> cycle, Set<List<String>> cycles) {
            ArrayList<String> superCycle = new ArrayList<String>(cycle);
            superCycle.remove(superCycle.size() - 1);
            superCycle.addAll(cycle);
            for (List<String> foundCycle : cycles) {
                if (foundCycle.size() != cycle.size() || Collections.indexOfSubList(superCycle, foundCycle) == -1) continue;
                return true;
            }
            return false;
        }

        private boolean hasCycles(Set<ConsumerPair> pairs) {
            HashSet<List<String>> cycles = new HashSet<List<String>>();
            for (ConsumerPair consumerPair : pairs) {
                HashSet<ConsumerPair> reducedPairs = new HashSet<ConsumerPair>(pairs);
                reducedPairs.remove(consumerPair);
                ArrayList<String> path = new ArrayList<String>(Collections.singleton(consumerPair.srcMemberId));
                if (!this.isLinked(consumerPair.dstMemberId, consumerPair.srcMemberId, reducedPairs, path) || this.in(path, cycles)) continue;
                cycles.add(new ArrayList<String>(path));
                log.error("A cycle of length {} was found: {}", (Object)(path.size() - 1), (Object)((Object)path).toString());
            }
            for (List list : cycles) {
                if (list.size() != 3) continue;
                return true;
            }
            return false;
        }

        private boolean isSticky() {
            for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>> topicMovements : this.partitionMovementsByTopic.entrySet()) {
                Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet();
                if (!this.hasCycles(topicMovementPairs)) continue;
                log.error("Stickiness is violated for topic {}\nPartition movements for this topic occurred among the following consumer pairs:\n{}", (Object)topicMovements.getKey(), (Object)topicMovements.getValue().toString());
                return false;
            }
            return true;
        }
    }

    private static class SubscriptionComparator
    implements Comparator<String>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Map<String, List<TopicPartition>> map;

        SubscriptionComparator(Map<String, List<TopicPartition>> map) {
            this.map = map;
        }

        @Override
        public int compare(String o1, String o2) {
            int ret = this.map.get(o1).size() - this.map.get(o2).size();
            if (ret == 0) {
                ret = o1.compareTo(o2);
            }
            return ret;
        }
    }

    private static class PartitionComparator
    implements Comparator<TopicPartition>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Map<String, List<String>> map;

        PartitionComparator(Map<String, List<String>> map) {
            this.map = map;
        }

        @Override
        public int compare(TopicPartition o1, TopicPartition o2) {
            int ret = this.map.get(o1.topic()).size() - this.map.get(o2.topic()).size();
            if (ret == 0 && (ret = o1.topic().compareTo(o2.topic())) == 0) {
                ret = o1.partition() - o2.partition();
            }
            return ret;
        }
    }

    private static class TopicComparator
    implements Comparator<String>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Map<String, List<String>> map;

        TopicComparator(Map<String, List<String>> map) {
            this.map = map;
        }

        @Override
        public int compare(String o1, String o2) {
            int ret = this.map.get(o1).size() - this.map.get(o2).size();
            if (ret == 0) {
                ret = o1.compareTo(o2);
            }
            return ret;
        }
    }

    public static final class MemberData {
        public final List<TopicPartition> partitions;
        public final Optional<Integer> generation;

        public MemberData(List<TopicPartition> partitions, Optional<Integer> generation) {
            this.partitions = partitions;
            this.generation = generation;
        }
    }

    static final class ConsumerGenerationPair {
        final String consumer;
        final int generation;

        ConsumerGenerationPair(String consumer, int generation) {
            this.consumer = consumer;
            this.generation = generation;
        }
    }
}

