/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.rate.maxrate;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.collections4.ListUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerInstance;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.ConsumerRateInfo;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.MaxRate;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.MaxRatePathSerializer;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.RateHistory;
import pl.allegro.tech.hermes.consumers.consumer.rate.maxrate.RateInfo;
import pl.allegro.tech.hermes.consumers.subscription.cache.SubscriptionsCache;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.HierarchicalCache;

public class MaxRateRegistry {
    private static final Logger logger = LoggerFactory.getLogger(MaxRateRegistry.class);
    private static final int LEVEL_SUBSCRIPTION = 0;
    private static final int LEVEL_CONSUMER = 1;
    private static final int LEVEL_CONTENT = 2;
    private final CuratorFramework curator;
    private final ObjectMapper objectMapper;
    private final ZookeeperPaths zookeeperPaths;
    private final MaxRatePathSerializer pathSerializer;
    private final HierarchicalCache cache;
    private final SubscriptionsCache subscriptionsCache;
    private final String cluster;
    private final Map<ConsumerInstance, RateInfo> rateInfos = new ConcurrentHashMap<ConsumerInstance, RateInfo>();

    @Inject
    public MaxRateRegistry(ConfigFactory configFactory, CuratorFramework curator, ObjectMapper objectMapper, ZookeeperPaths zookeeperPaths, MaxRatePathSerializer pathSerializer, SubscriptionsCache subscriptionsCache) {
        this.curator = curator;
        this.objectMapper = objectMapper;
        this.zookeeperPaths = zookeeperPaths;
        this.pathSerializer = pathSerializer;
        this.subscriptionsCache = subscriptionsCache;
        this.cluster = configFactory.getStringProperty(Configs.KAFKA_CLUSTER_NAME);
        ThreadFactory cacheThreadFactory = new ThreadFactoryBuilder().setNameFormat("max-rate-registry-%d").build();
        this.cache = new HierarchicalCache(curator, Executors.newSingleThreadExecutor(cacheThreadFactory), zookeeperPaths.consumersRateRuntimePath(this.cluster), 3, Collections.emptyList());
        this.handleContentUpdates();
        this.handleConsumerUpdates();
    }

    public void start() throws Exception {
        long startNanos = System.nanoTime();
        this.loadExistingEntries();
        this.cache.start();
        long elapsedMillis = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
        logger.info("Started in {}ms", (Object)elapsedMillis);
    }

    public void stop() throws Exception {
        this.cache.stop();
    }

    Set<ConsumerRateInfo> ensureCorrectAssignments(SubscriptionName subscriptionName, Set<String> currentConsumers) {
        HashSet<ConsumerRateInfo> rateInfos = new HashSet<ConsumerRateInfo>();
        try {
            this.cleanupRegistry(subscriptionName, new ArrayList<String>(currentConsumers));
            for (String consumerId : currentConsumers) {
                ConsumerInstance consumerInstance = new ConsumerInstance(consumerId, subscriptionName);
                RateInfo rateInfo = this.rateInfos.getOrDefault(consumerInstance, RateInfo.empty());
                rateInfos.add(new ConsumerRateInfo(consumerId, rateInfo));
            }
        }
        catch (Exception e) {
            throw new InternalProcessingException("Trouble ensuring assignments in zookeeper", (Throwable)e);
        }
        return rateInfos;
    }

    void update(SubscriptionName subscriptionName, Map<String, MaxRate> newMaxRates) {
        try {
            for (Map.Entry<String, MaxRate> entry : newMaxRates.entrySet()) {
                String maxRatePath = this.zookeeperPaths.consumersMaxRatePath(this.cluster, subscriptionName, entry.getKey());
                this.writeOrCreate(maxRatePath, this.objectMapper.writeValueAsBytes((Object)entry.getValue()));
            }
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    Optional<MaxRate> getMaxRate(ConsumerInstance consumer) {
        RateInfo rateInfo = this.rateInfos.get(consumer);
        return Optional.ofNullable(rateInfo).map(RateInfo::getMaxRate).orElse(Optional.empty());
    }

    RateHistory getRateHistory(ConsumerInstance consumer) {
        return Optional.ofNullable(this.rateInfos.get(consumer)).map(RateInfo::getRateHistory).orElse(RateHistory.empty());
    }

    void writeRateHistory(ConsumerInstance consumer, RateHistory rateHistory) {
        String path = this.zookeeperPaths.consumersRateHistoryPath(this.cluster, consumer.getSubscription(), consumer.getConsumerId());
        try {
            byte[] serialized = this.objectMapper.writeValueAsBytes((Object)rateHistory);
            this.writeOrCreate(path, serialized);
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    private void cleanupRegistry(SubscriptionName subscriptionName, List<String> currentConsumers) throws Exception {
        List previousConsumers = this.rateInfos.keySet().stream().filter(c -> c.getSubscription().equals((Object)subscriptionName)).map(c -> c.getConsumerId()).collect(Collectors.toList());
        List toRemove = ListUtils.subtract(previousConsumers, currentConsumers);
        if (!toRemove.isEmpty()) {
            logger.info("Removing consumers for max rates for subscription {}: {}", (Object)subscriptionName, (Object)toRemove);
        }
        toRemove.forEach(removedConsumer -> this.removeConsumerEntries(subscriptionName, (String)removedConsumer));
    }

    private void removeConsumerEntries(SubscriptionName subscriptionName, String consumerId) {
        try {
            this.curator.delete().deletingChildrenIfNeeded().forPath(this.zookeeperPaths.consumersRatePath(this.cluster, subscriptionName, consumerId));
        }
        catch (KeeperException.NoNodeException noNodeException) {
        }
        catch (Exception e) {
            throw new InternalProcessingException((Throwable)e);
        }
    }

    private void writeOrCreate(String path, byte[] serializedData) throws Exception {
        try {
            this.curator.setData().forPath(path, serializedData);
        }
        catch (KeeperException.NoNodeException e) {
            try {
                this.curator.create().creatingParentContainersIfNeeded().forPath(path, serializedData);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    private void handleConsumerUpdates() {
        this.cache.registerCallback(1, event -> {
            String path = event.getData().getPath();
            ConsumerInstance consumer = this.pathSerializer.consumerInstanceFromConsumerPath(path);
            switch (event.getType()) {
                case CHILD_REMOVED: {
                    this.rateInfos.remove(consumer);
                }
            }
        });
    }

    private void handleContentUpdates() {
        this.cache.registerCallback(2, event -> {
            String path = event.getData().getPath();
            byte[] bytes = event.getData().getData();
            ConsumerInstance consumer = this.pathSerializer.consumerInstanceFromContentPath(path);
            String content = this.pathSerializer.content(path);
            block0 : switch (event.getType()) {
                case CHILD_ADDED: 
                case CHILD_UPDATED: {
                    switch (content) {
                        case "max-rate": {
                            this.updateMaxRateInCache(consumer, bytes);
                            break block0;
                        }
                        case "history": {
                            this.updateRateHistoryInCache(consumer, bytes);
                        }
                    }
                }
            }
        });
    }

    private void updateMaxRateInCache(ConsumerInstance consumer, byte[] bytes) {
        try {
            MaxRate maxRate = (MaxRate)this.objectMapper.readValue(bytes, MaxRate.class);
            this.rateInfos.compute(consumer, (key, oldValue) -> oldValue == null ? RateInfo.withNoHistory(maxRate) : oldValue.copyWithNewMaxRate(maxRate));
        }
        catch (Exception e) {
            logger.warn("Problem updating max rate for consumer {}", (Object)consumer, (Object)e);
        }
    }

    private void updateRateHistoryInCache(ConsumerInstance consumer, byte[] bytes) {
        try {
            RateHistory rateHistory = (RateHistory)this.objectMapper.readValue(bytes, RateHistory.class);
            this.rateInfos.compute(consumer, (key, oldValue) -> oldValue == null ? RateInfo.withNoMaxRate(rateHistory) : oldValue.copyWithNewRateHistory(rateHistory));
        }
        catch (Exception e) {
            logger.warn("Problem updating rate history for consumer {}", (Object)consumer, (Object)e);
        }
    }

    private void loadExistingEntries() {
        List<SubscriptionName> subscriptions = this.subscriptionsCache.listActiveSubscriptionNames();
        int loadedMaxRates = 0;
        for (SubscriptionName subscriptionName : subscriptions) {
            try {
                List<String> assignedConsumers = this.consumersInRegistry(subscriptionName);
                if (!this.setInitialMaxRates(subscriptionName, assignedConsumers)) continue;
                ++loadedMaxRates;
            }
            catch (Exception e) {
                logger.warn("Exception occurred when initializing cache for subscription {}", (Object)subscriptionName, (Object)e);
            }
        }
        logger.info("Loaded max-rates of {} out of {} subscriptions", (Object)loadedMaxRates, (Object)subscriptions.size());
    }

    private List<String> consumersInRegistry(SubscriptionName subscriptionName) throws Exception {
        String subscriptionConsumersPath = this.zookeeperPaths.consumersRateSubscriptionPath(this.cluster, subscriptionName);
        return (List)this.curator.getChildren().forPath(subscriptionConsumersPath);
    }

    private boolean setInitialMaxRates(SubscriptionName subscriptionName, List<String> consumerIds) {
        boolean atLeastOneConsumerInitialized = false;
        for (String consumerId : consumerIds) {
            try {
                ConsumerInstance consumer = new ConsumerInstance(consumerId, subscriptionName);
                byte[] rawMaxRate = (byte[])this.curator.getData().forPath(this.zookeeperPaths.consumersMaxRatePath(this.cluster, subscriptionName, consumerId));
                MaxRate maxRate = (MaxRate)this.objectMapper.readValue(rawMaxRate, MaxRate.class);
                this.rateInfos.put(consumer, RateInfo.withNoHistory(maxRate));
                atLeastOneConsumerInitialized = true;
            }
            catch (Exception e) {
                logger.warn("Exception occurred when initializing cache for subscription {} and consumer {}", new Object[]{subscriptionName, consumerId, e});
            }
        }
        return atLeastOneConsumerInitialized;
    }
}

