/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted;

import java.time.Clock;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.concurrent.ExecutorServiceFactory;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
import pl.allegro.tech.hermes.consumers.subscription.id.SubscriptionIds;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoad;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadDecoder;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadEncoder;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadRegistry;
import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionLoad;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

public class ZookeeperConsumerNodeLoadRegistry
implements ConsumerNodeLoadRegistry {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperConsumerNodeLoadRegistry.class);
    private final Duration interval;
    private final CuratorFramework curator;
    private final ZookeeperPaths zookeeperPaths;
    private final Clock clock;
    private final String basePath;
    private final String currentConsumerPath;
    private final ConsumerNodeLoadEncoder encoder;
    private final ConsumerNodeLoadDecoder decoder;
    private final ScheduledExecutorService executor;
    private final Set<ZookeeperSubscriptionLoadRecorder> subscriptionLoadRecorders = Collections.newSetFromMap(new ConcurrentHashMap());
    private volatile long lastReset = 0L;
    private volatile double currentOperationsPerSecond = 0.0;

    public ZookeeperConsumerNodeLoadRegistry(CuratorFramework curator, SubscriptionIds subscriptionIds, ZookeeperPaths zookeeperPaths, String currentConsumerId, String clusterName, Duration interval, ExecutorServiceFactory executorServiceFactory, Clock clock, HermesMetrics metrics, int consumerLoadEncoderBufferSizeBytes) {
        this.curator = curator;
        this.zookeeperPaths = zookeeperPaths;
        this.clock = clock;
        this.basePath = zookeeperPaths.join(new String[]{zookeeperPaths.basePath(), "consumers-workload", clusterName, "consumer-load"});
        this.currentConsumerPath = this.resolveConsumerLoadPath(currentConsumerId);
        this.interval = interval;
        this.encoder = new ConsumerNodeLoadEncoder(subscriptionIds, consumerLoadEncoderBufferSizeBytes);
        this.decoder = new ConsumerNodeLoadDecoder(subscriptionIds);
        this.executor = executorServiceFactory.createSingleThreadScheduledExecutor("consumer-node-load-reporter-%d");
        metrics.registerGauge("consumer-workload.weighted.load.ops", () -> this.currentOperationsPerSecond);
    }

    @Override
    public void start() {
        this.executor.scheduleWithFixedDelay(this::report, 0L, this.interval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        this.executor.shutdown();
    }

    private void report() {
        try {
            ConsumerNodeLoad consumerNodeLoad = this.calculateConsumerNodeLoad();
            this.currentOperationsPerSecond = consumerNodeLoad.getLoadPerSubscription().values().stream().mapToDouble(SubscriptionLoad::getOperationsPerSecond).sum();
            this.persist(consumerNodeLoad);
        }
        catch (Exception e) {
            logger.error("Error while reporting consumer node load", (Throwable)e);
        }
    }

    private ConsumerNodeLoad calculateConsumerNodeLoad() {
        long now = this.clock.millis();
        long elapsedMillis = now - this.lastReset;
        long elapsedSeconds = Math.max(TimeUnit.MILLISECONDS.toSeconds(elapsedMillis), 1L);
        this.lastReset = now;
        Map<SubscriptionName, SubscriptionLoad> loadPerSubscription = this.subscriptionLoadRecorders.stream().collect(Collectors.toMap(ZookeeperSubscriptionLoadRecorder::getSubscriptionName, recorder -> recorder.calculate(elapsedSeconds)));
        return new ConsumerNodeLoad(loadPerSubscription);
    }

    private void persist(ConsumerNodeLoad metrics) throws Exception {
        byte[] encoded = this.encoder.encode(metrics);
        try {
            this.curator.setData().forPath(this.currentConsumerPath, encoded);
        }
        catch (KeeperException.NoNodeException e) {
            try {
                ((ACLBackgroundPathAndBytesable)this.curator.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(this.currentConsumerPath, encoded);
            }
            catch (KeeperException.NodeExistsException nodeExistsException) {
                // empty catch block
            }
        }
    }

    @Override
    public ConsumerNodeLoad get(String consumerId) {
        String consumerLoadPath = this.resolveConsumerLoadPath(consumerId);
        try {
            if (this.curator.checkExists().forPath(consumerLoadPath) != null) {
                byte[] bytes = (byte[])this.curator.getData().forPath(consumerLoadPath);
                return this.decoder.decode(bytes);
            }
        }
        catch (Exception e) {
            logger.warn("Could not read node data on path " + consumerLoadPath, (Throwable)e);
        }
        return ConsumerNodeLoad.UNDEFINED;
    }

    private String resolveConsumerLoadPath(String consumerId) {
        return this.zookeeperPaths.join(new String[]{this.basePath, consumerId});
    }

    @Override
    public SubscriptionLoadRecorder register(SubscriptionName subscriptionName) {
        return new ZookeeperSubscriptionLoadRecorder(subscriptionName);
    }

    private class ZookeeperSubscriptionLoadRecorder
    implements SubscriptionLoadRecorder {
        private final SubscriptionName subscriptionName;
        private final LongAdder operationsCounter = new LongAdder();

        ZookeeperSubscriptionLoadRecorder(SubscriptionName subscriptionName) {
            this.subscriptionName = subscriptionName;
        }

        @Override
        public void initialize() {
            this.operationsCounter.reset();
            ZookeeperConsumerNodeLoadRegistry.this.subscriptionLoadRecorders.add(this);
        }

        @Override
        public void recordSingleOperation() {
            this.operationsCounter.increment();
        }

        @Override
        public void shutdown() {
            this.operationsCounter.reset();
            ZookeeperConsumerNodeLoadRegistry.this.subscriptionLoadRecorders.remove(this);
        }

        SubscriptionName getSubscriptionName() {
            return this.subscriptionName;
        }

        SubscriptionLoad calculate(long elapsedSeconds) {
            double operationsPerSecond = (double)this.operationsCounter.sumThenReset() / (double)elapsedSeconds;
            return new SubscriptionLoad(operationsPerSecond);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ZookeeperSubscriptionLoadRecorder that = (ZookeeperSubscriptionLoadRecorder)o;
            return this.subscriptionName.equals((Object)that.subscriptionName);
        }

        public int hashCode() {
            return Objects.hash(this.subscriptionName);
        }
    }
}

