/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.selective;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignment;
import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentView;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancer;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancingResult;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadConstraints;
import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.AvailableWork;

public class SelectiveWorkBalancer
implements WorkBalancer {
    private static final Logger logger = LoggerFactory.getLogger(SelectiveWorkBalancer.class);

    @Override
    public WorkBalancingResult balance(List<SubscriptionName> subscriptions, List<String> activeConsumerNodes, SubscriptionAssignmentView currentState, WorkloadConstraints constraints) {
        List<SubscriptionName> removedSubscriptions = this.findRemovedSubscriptions(currentState, subscriptions);
        List<String> inactiveConsumers = this.findInactiveConsumers(currentState, activeConsumerNodes);
        List<SubscriptionName> newSubscriptions = this.findNewSubscriptions(currentState, subscriptions);
        List<String> newConsumers = this.findNewConsumers(currentState, activeConsumerNodes);
        SubscriptionAssignmentView balancedState = this.balance(currentState, removedSubscriptions, inactiveConsumers, newSubscriptions, newConsumers, constraints);
        this.log(subscriptions, activeConsumerNodes, currentState, balancedState);
        return new WorkBalancingResult(balancedState, this.countMissingResources(subscriptions, balancedState, constraints));
    }

    private SubscriptionAssignmentView balance(SubscriptionAssignmentView currentState, List<SubscriptionName> removedSubscriptions, List<String> inactiveConsumers, List<SubscriptionName> newSubscriptions, List<String> newConsumers, WorkloadConstraints constraints) {
        return currentState.transform((state, transformer) -> {
            removedSubscriptions.forEach(transformer::removeSubscription);
            inactiveConsumers.forEach(transformer::removeConsumerNode);
            newSubscriptions.forEach(transformer::addSubscription);
            newConsumers.forEach(transformer::addConsumerNode);
            this.minimizeWorkload((SubscriptionAssignmentView)state, (SubscriptionAssignmentView.Transformer)transformer, constraints);
            AvailableWork.stream(state, constraints).forEach(transformer::addAssignment);
            this.equalizeWorkload((SubscriptionAssignmentView)state, (SubscriptionAssignmentView.Transformer)transformer);
        });
    }

    private void minimizeWorkload(SubscriptionAssignmentView state, SubscriptionAssignmentView.Transformer transformer, WorkloadConstraints workloadConstraints) {
        state.getSubscriptions().stream().flatMap(subscriptionName -> this.findRedundantAssignments(state, (SubscriptionName)subscriptionName, workloadConstraints)).forEach(transformer::removeAssignment);
    }

    private Stream<SubscriptionAssignment> findRedundantAssignments(SubscriptionAssignmentView state, SubscriptionName subscriptionName, WorkloadConstraints constraints) {
        int requiredConsumers;
        int assignedConsumers = state.getAssignmentsCountForSubscription(subscriptionName);
        int redundantConsumers = assignedConsumers - (requiredConsumers = constraints.getConsumerCount(subscriptionName));
        if (redundantConsumers > 0) {
            Stream.Builder<SubscriptionAssignment> redundant = Stream.builder();
            Iterator<SubscriptionAssignment> iterator = state.getAssignmentsForSubscription(subscriptionName).iterator();
            while (redundantConsumers > 0 && iterator.hasNext()) {
                SubscriptionAssignment assignment = iterator.next();
                redundant.add(assignment);
                --redundantConsumers;
            }
            return redundant.build();
        }
        return Stream.empty();
    }

    private int countMissingResources(List<SubscriptionName> subscriptions, SubscriptionAssignmentView state, WorkloadConstraints constraints) {
        return subscriptions.stream().mapToInt(s -> {
            int subscriptionAssignments;
            int requiredConsumers = constraints.getConsumerCount((SubscriptionName)s);
            int missing = requiredConsumers - (subscriptionAssignments = state.getAssignmentsCountForSubscription((SubscriptionName)s));
            if (missing != 0) {
                logger.info("Subscription {} has {} != {} (default) assignments", new Object[]{s, subscriptionAssignments, requiredConsumers});
            }
            return missing;
        }).sum();
    }

    private void equalizeWorkload(SubscriptionAssignmentView state, SubscriptionAssignmentView.Transformer transformer) {
        if (state.getSubscriptionsCount() > 1 && !state.getConsumerNodes().isEmpty()) {
            boolean transferred;
            do {
                Optional<SubscriptionName> subscription;
                transferred = false;
                String maxLoaded = this.maxLoadedConsumerNode(state);
                String minLoaded = this.minLoadedConsumerNode(state);
                int maxLoad = state.getAssignmentsCountForConsumerNode(maxLoaded);
                int minLoad = state.getAssignmentsCountForConsumerNode(minLoaded);
                while (maxLoad > minLoad + 1 && (subscription = this.getSubscriptionForTransfer(state, maxLoaded, minLoaded)).isPresent()) {
                    transformer.transferAssignment(maxLoaded, minLoaded, subscription.get());
                    transferred = true;
                    --maxLoad;
                    ++minLoad;
                }
            } while (transferred);
        }
    }

    private String maxLoadedConsumerNode(SubscriptionAssignmentView state) {
        return state.getConsumerNodes().stream().max(Comparator.comparingInt(state::getAssignmentsCountForConsumerNode)).get();
    }

    private String minLoadedConsumerNode(SubscriptionAssignmentView state) {
        return state.getConsumerNodes().stream().min(Comparator.comparingInt(state::getAssignmentsCountForConsumerNode)).get();
    }

    private Optional<SubscriptionName> getSubscriptionForTransfer(SubscriptionAssignmentView state, String maxLoaded, String minLoaded) {
        return state.getSubscriptionsForConsumerNode(maxLoaded).stream().filter(s -> !state.getConsumerNodesForSubscription((SubscriptionName)s).contains(minLoaded)).findAny();
    }

    private List<SubscriptionName> findRemovedSubscriptions(SubscriptionAssignmentView state, List<SubscriptionName> subscriptions) {
        return state.getSubscriptions().stream().filter(s -> !subscriptions.contains(s)).collect(Collectors.toList());
    }

    private List<String> findInactiveConsumers(SubscriptionAssignmentView state, List<String> activeConsumers) {
        return state.getConsumerNodes().stream().filter(c -> !activeConsumers.contains(c)).collect(Collectors.toList());
    }

    private List<SubscriptionName> findNewSubscriptions(SubscriptionAssignmentView state, List<SubscriptionName> subscriptions) {
        return subscriptions.stream().filter(s -> !state.getSubscriptions().contains(s)).collect(Collectors.toList());
    }

    private List<String> findNewConsumers(SubscriptionAssignmentView state, List<String> activeConsumers) {
        return activeConsumers.stream().filter(c -> !state.getConsumerNodes().contains(c)).collect(Collectors.toList());
    }

    private void log(List<SubscriptionName> subscriptions, List<String> activeConsumerNodes, SubscriptionAssignmentView currentState, SubscriptionAssignmentView balancedState) {
        logger.info("Balancing {} subscriptions across {} nodes with previous {} assignments produced {} assignments", new Object[]{subscriptions.size(), activeConsumerNodes.size(), currentState.getAllAssignments().size(), balancedState.getAllAssignments().size()});
    }
}

