/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener;
import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoad;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionLoad;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfile;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileProvider;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfiles;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.Weight;

@NotThreadSafe
public class SubscriptionProfilesCalculator
implements SubscriptionProfileProvider,
BalancingListener {
    private final ConsumerNodeLoadRegistry consumerNodeLoadRegistry;
    private final SubscriptionProfileRegistry subscriptionProfileRegistry;
    private final Clock clock;
    private final Duration weightWindowSize;
    private final SubscriptionProfiles profiles = new SubscriptionProfiles();

    public SubscriptionProfilesCalculator(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, SubscriptionProfileRegistry subscriptionProfileRegistry, Clock clock, Duration weightWindowSize) {
        this.consumerNodeLoadRegistry = consumerNodeLoadRegistry;
        this.subscriptionProfileRegistry = subscriptionProfileRegistry;
        this.clock = clock;
        this.weightWindowSize = weightWindowSize;
    }

    @Override
    public void onBeforeBalancing(List<String> activeConsumers) {
        SubscriptionProfiles subscriptionProfiles = this.subscriptionProfileRegistry.fetch();
        Map<SubscriptionName, WeightCalculator> currentWeights = this.createWeightCalculators(activeConsumers, subscriptionProfiles);
        this.profiles.reset(this.clock.instant());
        for (Map.Entry<SubscriptionName, WeightCalculator> entry : currentWeights.entrySet()) {
            SubscriptionName subscriptionName = entry.getKey();
            WeightCalculator weightCalculator = entry.getValue();
            SubscriptionProfile previousProfile = subscriptionProfiles.getProfile(subscriptionName);
            SubscriptionProfile newProfile = new SubscriptionProfile(previousProfile != null ? previousProfile.getLastRebalanceTimestamp() : null, weightCalculator.calculateExponentiallyWeightedMovingAverage(this.profiles.getUpdateTimestamp()));
            this.profiles.updateProfile(subscriptionName, newProfile);
        }
    }

    private Map<SubscriptionName, WeightCalculator> createWeightCalculators(List<String> activeConsumers, SubscriptionProfiles subscriptionProfiles) {
        HashMap<SubscriptionName, WeightCalculator> weightCalculators = new HashMap<SubscriptionName, WeightCalculator>();
        for (String consumerId : activeConsumers) {
            ConsumerNodeLoad consumerNodeLoad = this.consumerNodeLoadRegistry.get(consumerId);
            for (Map.Entry<SubscriptionName, SubscriptionLoad> entry : consumerNodeLoad.getLoadPerSubscription().entrySet()) {
                SubscriptionName subscriptionName = entry.getKey();
                Weight currentWeight = new Weight(entry.getValue().getOperationsPerSecond());
                WeightCalculator weightCalculator = weightCalculators.computeIfAbsent(subscriptionName, subscription -> this.createWeightCalculator(subscriptionProfiles, subscriptionName));
                weightCalculator.update(currentWeight);
            }
        }
        return weightCalculators;
    }

    private WeightCalculator createWeightCalculator(SubscriptionProfiles subscriptionProfiles, SubscriptionName subscriptionName) {
        SubscriptionProfile subscriptionProfile = subscriptionProfiles.getProfile(subscriptionName);
        return new WeightCalculator(this.weightWindowSize, subscriptionProfile != null ? subscriptionProfile.getWeight() : null, subscriptionProfiles.getUpdateTimestamp());
    }

    @Override
    public void onAfterBalancing(WorkDistributionChanges changes) {
        for (SubscriptionName subscriptionName : changes.getRebalancedSubscriptions()) {
            SubscriptionProfile profile = this.profiles.getProfile(subscriptionName);
            if (profile == null) continue;
            this.profiles.updateProfile(subscriptionName, new SubscriptionProfile(this.clock.instant(), profile.getWeight()));
        }
        this.subscriptionProfileRegistry.persist(this.profiles);
    }

    @Override
    public SubscriptionProfile get(SubscriptionName subscriptionName) {
        return this.profiles.getProfileOrUndefined(subscriptionName);
    }

    private static class WeightCalculator {
        private final Duration weightWindowSize;
        private final Weight previousWeight;
        private final Instant previousUpdateTimestamp;
        private Weight currentWeight = Weight.ZERO;

        WeightCalculator(Duration weightWindowSize, Weight previousWeight, Instant previousUpdateTimestamp) {
            this.previousWeight = previousWeight;
            this.weightWindowSize = weightWindowSize;
            this.previousUpdateTimestamp = previousUpdateTimestamp;
        }

        void update(Weight weight) {
            this.currentWeight = Weight.max(weight, this.currentWeight);
        }

        Weight calculateExponentiallyWeightedMovingAverage(Instant now) {
            if (this.previousWeight == null || this.previousUpdateTimestamp == null) {
                return this.currentWeight;
            }
            Duration elapsed = Duration.between(this.previousUpdateTimestamp, now);
            long elapsedMillis = Math.max(elapsed.toMillis(), 0L);
            double alpha = 1.0 - Math.exp(-1.0 * ((double)elapsedMillis / (double)this.weightWindowSize.toMillis()));
            return this.currentWeight.multiply(alpha).add(this.previousWeight.multiply(1.0 - alpha));
        }
    }
}

