/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted;

import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoad;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.CurrentLoadProvider;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfile;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfiles;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfilesCalculator;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkloadMetrics;

public class WeightedWorkBalancingListener
implements BalancingListener {
    private final ConsumerNodeLoadRegistry consumerNodeLoadRegistry;
    private final SubscriptionProfileRegistry subscriptionProfileRegistry;
    private final CurrentLoadProvider currentLoadProvider;
    private final WeightedWorkloadMetrics weightedWorkloadMetrics;
    private final SubscriptionProfilesCalculator subscriptionProfilesCalculator;
    private final Clock clock;

    public WeightedWorkBalancingListener(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, SubscriptionProfileRegistry subscriptionProfileRegistry, CurrentLoadProvider currentLoadProvider, WeightedWorkloadMetrics weightedWorkloadMetrics, Clock clock, Duration weightWindowSize) {
        this.consumerNodeLoadRegistry = consumerNodeLoadRegistry;
        this.subscriptionProfileRegistry = subscriptionProfileRegistry;
        this.currentLoadProvider = currentLoadProvider;
        this.weightedWorkloadMetrics = weightedWorkloadMetrics;
        this.subscriptionProfilesCalculator = new SubscriptionProfilesCalculator(clock, weightWindowSize);
        this.clock = clock;
    }

    @Override
    public void onBeforeBalancing(List<String> activeConsumers) {
        this.weightedWorkloadMetrics.unregisterMetricsForConsumersOtherThan(new HashSet<String>(activeConsumers));
        Map<String, ConsumerNodeLoad> newConsumerLoads = this.fetchConsumerNodeLoads(activeConsumers);
        this.currentLoadProvider.updateConsumerNodeLoads(newConsumerLoads);
        SubscriptionProfiles currentProfiles = this.recalculateSubscriptionProfiles(newConsumerLoads.values());
        this.currentLoadProvider.updateProfiles(currentProfiles);
    }

    private Map<String, ConsumerNodeLoad> fetchConsumerNodeLoads(List<String> activeConsumers) {
        return activeConsumers.stream().collect(Collectors.toMap(Function.identity(), this.consumerNodeLoadRegistry::get));
    }

    private SubscriptionProfiles recalculateSubscriptionProfiles(Collection<ConsumerNodeLoad> consumerNodeLoads) {
        SubscriptionProfiles previousProfiles = this.subscriptionProfileRegistry.fetch();
        return this.subscriptionProfilesCalculator.calculate(consumerNodeLoads, previousProfiles);
    }

    @Override
    public void onAfterBalancing(WorkDistributionChanges changes) {
        this.applyRebalanceTimestampToSubscriptionProfiles(changes.getRebalancedSubscriptions());
    }

    private void applyRebalanceTimestampToSubscriptionProfiles(Set<SubscriptionName> rebalancedSubscriptions) {
        SubscriptionProfiles currentProfiles = this.currentLoadProvider.getProfiles();
        HashMap<SubscriptionName, SubscriptionProfile> profilePerSubscription = new HashMap<SubscriptionName, SubscriptionProfile>(currentProfiles.getProfiles());
        for (SubscriptionName subscriptionName : rebalancedSubscriptions) {
            SubscriptionProfile profile = (SubscriptionProfile)profilePerSubscription.get(subscriptionName);
            if (profile == null) continue;
            profilePerSubscription.put(subscriptionName, new SubscriptionProfile(this.clock.instant(), profile.getWeight()));
        }
        SubscriptionProfiles finalProfiles = new SubscriptionProfiles(profilePerSubscription, currentProfiles.getUpdateTimestamp());
        this.subscriptionProfileRegistry.persist(finalProfiles);
        this.currentLoadProvider.updateProfiles(finalProfiles);
    }

    @Override
    public void onBalancingSkipped() {
        this.weightedWorkloadMetrics.unregisterLeaderMetrics();
        this.currentLoadProvider.clear();
    }
}

