/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.rate.maxrate;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerInstance;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerMaxRates;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerMaxRatesDecoder;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerMaxRatesEncoder;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerRateHistoriesDecoder;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerRateHistoriesEncoder;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerRateHistory;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerRateInfo;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.FlatBinaryMaxRateRegistryPaths;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.MaxRate;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.MaxRateRegistry;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.RateHistory;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.RateInfo;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ZookeeperOperations;
import pl.allegro.tech.hermes.consumers.subscription.id.SubscriptionIds;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache;
import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

class FlatBinaryMaxRateRegistry
implements MaxRateRegistry,
NodeCacheListener {
    private static final Logger logger = LoggerFactory.getLogger(FlatBinaryMaxRateRegistry.class);
    private final ZookeeperOperations zookeeper;
    private final Map<String, ConsumerMaxRates> consumersMaxRates = new HashMap<String, ConsumerMaxRates>();
    private final Map<String, ConsumerRateHistory> consumersRateHistories = new HashMap<String, ConsumerRateHistory>();
    private final ConsumerRateHistory currentConsumerRateHistories;
    private final ConsumerMaxRates currentConsumerMaxRates;
    private final String consumerId;
    private final ClusterAssignmentCache clusterAssignmentCache;
    private final ConsumerAssignmentCache consumerAssignmentCache;
    private final ConsumerRateHistoriesEncoder consumerRateHistoriesEncoder;
    private final ConsumerRateHistoriesDecoder consumerRateHistoriesDecoder;
    private final ConsumerMaxRatesDecoder consumerMaxRatesDecoder;
    private final ConsumerMaxRatesEncoder consumerMaxRatesEncoder;
    private final NodeCache maxRateNodeCache;
    private final FlatBinaryMaxRateRegistryPaths registryPaths;

    FlatBinaryMaxRateRegistry(ConfigFactory configFactory, ClusterAssignmentCache clusterAssignmentCache, ConsumerAssignmentCache consumerAssignmentCache, CuratorFramework curator, ZookeeperPaths zookeeperPaths, SubscriptionIds subscriptionIds) {
        this.consumerId = configFactory.getStringProperty(Configs.CONSUMER_WORKLOAD_NODE_ID);
        this.clusterAssignmentCache = clusterAssignmentCache;
        this.consumerAssignmentCache = consumerAssignmentCache;
        String clusterName = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
        this.currentConsumerRateHistories = new ConsumerRateHistory();
        this.currentConsumerMaxRates = new ConsumerMaxRates();
        this.registryPaths = new FlatBinaryMaxRateRegistryPaths(zookeeperPaths, this.consumerId, clusterName);
        this.zookeeper = new ZookeeperOperations(curator);
        int historiesEncoderBufferSize = configFactory.getIntProperty(Configs.CONSUMER_MAXRATE_REGISTRY_BINARY_ENCODER_HISTORY_BUFFER_SIZE_BYTES);
        this.consumerRateHistoriesEncoder = new ConsumerRateHistoriesEncoder(subscriptionIds, historiesEncoderBufferSize);
        this.consumerRateHistoriesDecoder = new ConsumerRateHistoriesDecoder(subscriptionIds);
        int maxRateEncoderBufferSize = configFactory.getIntProperty(Configs.CONSUMER_MAXRATE_REGISTRY_BINARY_ENCODER_MAX_RATE_BUFFER_SIZE_BYTES);
        this.consumerMaxRatesEncoder = new ConsumerMaxRatesEncoder(subscriptionIds, maxRateEncoderBufferSize);
        this.consumerMaxRatesDecoder = new ConsumerMaxRatesDecoder(subscriptionIds);
        this.maxRateNodeCache = new NodeCache(curator, this.registryPaths.consumerMaxRatePath(this.consumerId));
        this.maxRateNodeCache.getListenable().addListener((Object)this);
    }

    @Override
    public void start() {
        try {
            logger.info("Starting flat binary max rate registry at {}, watching current consumer path at {}", (Object)this.registryPaths.consumersRateCurrentClusterRuntimeBinaryPath(), (Object)this.registryPaths.consumerMaxRatePath(this.consumerId));
            this.maxRateNodeCache.start();
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not start node cache for consumer max rate", e);
        }
        this.refreshConsumerMaxRates();
    }

    private void refreshConsumerMaxRates() {
        ChildData nodeData = this.maxRateNodeCache.getCurrentData();
        if (nodeData != null) {
            byte[] data = nodeData.getData();
            ConsumerMaxRates decodedMaxRates = this.consumerMaxRatesDecoder.decode(data);
            logger.info("Decoded {} bytes of max rates for current node with {} subscription entries", (Object)data.length, (Object)decodedMaxRates.size());
            this.currentConsumerMaxRates.setAllMaxRates(decodedMaxRates);
        }
    }

    @Override
    public void stop() {
        try {
            logger.info("Stopping flat binary max rate registry");
            this.maxRateNodeCache.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Could not stop node cache for consumer max rate", e);
        }
    }

    @Override
    public void onBeforeMaxRateCalculation() {
        Set<String> assignedConsumers = this.clusterAssignmentCache.getAssignedConsumers();
        this.clearCacheFromInactiveConsumers(assignedConsumers);
        this.refreshRateCachesOfConsumers(assignedConsumers);
    }

    private void clearCacheFromInactiveConsumers(Set<String> assignedConsumers) {
        this.consumersMaxRates.entrySet().removeIf(entry -> !assignedConsumers.contains(entry.getKey()));
        this.consumersRateHistories.entrySet().removeIf(entry -> !assignedConsumers.contains(entry.getKey()));
    }

    private void refreshRateCachesOfConsumers(Set<String> assignedConsumers) {
        this.getMaxRateConsumerNodes().forEach(consumerId -> {
            if (assignedConsumers.contains(consumerId)) {
                this.refreshConsumerRateHistory((String)consumerId);
                this.refreshConsumerMaxRate((String)consumerId);
            } else {
                this.removeConsumerRateRootNode((String)consumerId);
            }
        });
    }

    private List<String> getMaxRateConsumerNodes() {
        String path = this.registryPaths.consumersRateCurrentClusterRuntimeBinaryPath();
        try {
            if (this.zookeeper.exists(path)) {
                return this.zookeeper.getNodeChildren(path);
            }
        }
        catch (Exception e) {
            logger.warn("Could not get max rate consumer nodes list", (Throwable)e);
        }
        return Collections.emptyList();
    }

    private void refreshConsumerMaxRate(String consumerId) {
        logger.info("Refreshing max rate of {}", (Object)consumerId);
        String consumerMaxRatePath = this.registryPaths.consumerMaxRatePath(consumerId);
        this.zookeeper.getNodeData(consumerMaxRatePath).map(this.consumerMaxRatesDecoder::decode).ifPresent(maxRates -> {
            int decodedSize = maxRates.size();
            maxRates.cleanup(this.clusterAssignmentCache.getConsumerSubscriptions(consumerId));
            int cleanedSize = maxRates.size();
            if (decodedSize > cleanedSize) {
                logger.info("Refreshed max rates of {} with {} subscriptions ({} stale entries omitted)", new Object[]{consumerId, cleanedSize, decodedSize - cleanedSize});
            } else {
                logger.info("Refreshed max rates of {} with {} subscriptions", (Object)consumerId, (Object)cleanedSize);
            }
            this.consumersMaxRates.put(consumerId, (ConsumerMaxRates)maxRates);
        });
    }

    private void refreshConsumerRateHistory(String consumerId) {
        logger.info("Refreshing rate history of {}", (Object)consumerId);
        String consumerRateHistoryPath = this.registryPaths.consumerRateHistoryPath(consumerId);
        this.zookeeper.getNodeData(consumerRateHistoryPath).map(this.consumerRateHistoriesDecoder::decode).ifPresent(rateHistories -> {
            logger.info("Refreshed rate history of {} with {} subscriptions", (Object)consumerId, (Object)rateHistories.size());
            this.consumersRateHistories.put(consumerId, (ConsumerRateHistory)rateHistories);
        });
    }

    private void removeConsumerRateRootNode(String consumerId) {
        logger.info("Deleting max rate node of stale consumer {}", (Object)consumerId);
        String path = this.registryPaths.consumerRateParentRuntimePath(consumerId);
        try {
            this.zookeeper.deleteNodeRecursively(path);
        }
        catch (Exception e) {
            logger.warn("Could not delete stale consumer max rate node {}", (Object)path, (Object)e);
        }
    }

    @Override
    public void onAfterMaxRateCalculation() {
        this.persistMaxRatesForAllConsumers();
    }

    private void persistMaxRatesForAllConsumers() {
        this.consumersMaxRates.forEach((consumerId, maxRates) -> {
            byte[] encoded = this.consumerMaxRatesEncoder.encode((ConsumerMaxRates)maxRates);
            String consumerMaxRatePath = this.registryPaths.consumerMaxRatePath((String)consumerId);
            try {
                this.zookeeper.writeOrCreatePersistent(consumerMaxRatePath, encoded);
            }
            catch (Exception e) {
                logger.warn("Could not write max rates for consumer {}", consumerId, (Object)e);
            }
        });
    }

    @Override
    public Set<ConsumerRateInfo> ensureCorrectAssignments(SubscriptionName subscriptionName, Set<String> currentConsumers) {
        HashSet<ConsumerRateInfo> rateInfos = new HashSet<ConsumerRateInfo>();
        for (String consumerId : currentConsumers) {
            Optional<MaxRate> maxRate = Optional.ofNullable(this.consumersMaxRates.get(consumerId)).flatMap(rates -> rates.getMaxRate(subscriptionName));
            RateHistory rateHistory = Optional.ofNullable(this.consumersRateHistories.get(consumerId)).map(histories -> histories.getRateHistory(subscriptionName)).orElse(RateHistory.empty());
            rateInfos.add(new ConsumerRateInfo(consumerId, new RateInfo(maxRate, rateHistory)));
        }
        return rateInfos;
    }

    @Override
    public void update(SubscriptionName subscriptionName, Map<String, MaxRate> newMaxRates) {
        newMaxRates.forEach((consumerId, maxRate) -> {
            this.consumersMaxRates.putIfAbsent((String)consumerId, new ConsumerMaxRates());
            this.consumersMaxRates.get(consumerId).setMaxRate(subscriptionName, (MaxRate)maxRate);
        });
    }

    @Override
    public Optional<MaxRate> getMaxRate(ConsumerInstance consumer) {
        Preconditions.checkState((boolean)consumer.getConsumerId().equals(this.consumerId), (Object)"Reading max rate is allowed only for current consumer");
        return this.currentConsumerMaxRates.getMaxRate(consumer.getSubscription());
    }

    @Override
    public RateHistory getRateHistory(ConsumerInstance consumer) {
        Preconditions.checkState((boolean)consumer.getConsumerId().equals(this.consumerId), (Object)"Reading rate history is allowed only for current consumer");
        return this.currentConsumerRateHistories.getRateHistory(consumer.getSubscription());
    }

    @Override
    public void writeRateHistory(ConsumerInstance consumer, RateHistory rateHistory) {
        Preconditions.checkState((boolean)consumer.getConsumerId().equals(this.consumerId), (Object)"Saving rate history is allowed only for current consumer");
        this.currentConsumerRateHistories.setRateHistory(consumer.getSubscription(), rateHistory);
    }

    @Override
    public void onAfterWriteRateHistories() {
        Set<SubscriptionName> subscriptions = this.consumerAssignmentCache.getConsumerSubscriptions();
        this.currentConsumerRateHistories.cleanup(subscriptions);
        byte[] encoded = this.consumerRateHistoriesEncoder.encode(this.currentConsumerRateHistories);
        logger.info("Writing rate history of {} subscriptions, saving {} bytes", (Object)this.currentConsumerRateHistories.size(), (Object)encoded.length);
        try {
            this.zookeeper.writeOrCreatePersistent(this.registryPaths.currentConsumerRateHistoryPath(), encoded);
        }
        catch (Exception e) {
            logger.error("An error while saving consumers rate histories");
        }
    }

    public void nodeChanged() {
        this.refreshConsumerMaxRates();
    }
}

