/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.ArrayList;
import java.util.List;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.config.Configs;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.common.time.Clock;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitQueue;
import pl.allegro.tech.hermes.consumers.consumer.offset.OffsetCommitQueueMonitor;
import pl.allegro.tech.hermes.consumers.consumer.offset.TopicPartition;

public class SubscriptionOffsetCommitQueues {
    private final LoadingCache<TopicPartition, OffsetCommitQueue> queues;

    public SubscriptionOffsetCommitQueues(Subscription subscription, HermesMetrics hermesMetrics, Clock clock, ConfigFactory configFactory) {
        this.queues = CacheBuilder.newBuilder().build((CacheLoader)new OffsetCommitQueueLoader(subscription, hermesMetrics, clock, configFactory));
    }

    public void put(Message message) {
        OffsetCommitQueue helper = (OffsetCommitQueue)this.queues.getUnchecked((Object)new TopicPartition(message.getKafkaTopic(), message.getPartition()));
        helper.put(message.getOffset());
    }

    public void remove(Message message) {
        OffsetCommitQueue offsetCommitQueue = (OffsetCommitQueue)this.queues.getUnchecked((Object)new TopicPartition(message.getKafkaTopic(), message.getPartition()));
        offsetCommitQueue.markDelivered(message.getOffset());
    }

    public List<PartitionOffset> getOffsetsToCommit() {
        ArrayList<PartitionOffset> offsets = new ArrayList<PartitionOffset>();
        this.queues.asMap().forEach((topicAndPartition, queue) -> queue.poll().ifPresent(offset -> offsets.add(new PartitionOffset(topicAndPartition.getTopic(), offset.longValue(), topicAndPartition.getPartition()))));
        return offsets;
    }

    private static final class OffsetCommitQueueLoader
    extends CacheLoader<TopicPartition, OffsetCommitQueue> {
        private final Subscription subscription;
        private final HermesMetrics hermesMetrics;
        private final Clock clock;
        private final ConfigFactory configFactory;

        public OffsetCommitQueueLoader(Subscription subscription, HermesMetrics hermesMetrics, Clock clock, ConfigFactory configFactory) {
            this.subscription = subscription;
            this.hermesMetrics = hermesMetrics;
            this.clock = clock;
            this.configFactory = configFactory;
        }

        public OffsetCommitQueue load(TopicPartition topicPartition) throws Exception {
            return new OffsetCommitQueue(new OffsetCommitQueueMonitor(this.subscription, topicPartition, this.hermesMetrics, this.clock, this.configFactory.getIntProperty(Configs.CONSUMER_OFFSET_COMMIT_QUEUE_ALERT_MINIMAL_IDLE_PERIOD), this.configFactory.getIntProperty(Configs.CONSUMER_OFFSET_COMMIT_QUEUE_ALERT_SIZE)));
        }
    }
}

