/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.offset;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.offset.MessageState;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.rate.AdjustableSemaphore;

public class PendingOffsets {
    private final ConcurrentHashMap<SubscriptionPartitionOffset, MessageState> slots = new ConcurrentHashMap();
    private final AdjustableSemaphore inflightSemaphore;
    private final Semaphore maxPendingOffsetsSemaphore;

    public PendingOffsets(SubscriptionName subscriptionName, MetricsFacade metrics, int inflightQueueSize, int maxPendingOffsets) {
        this.maxPendingOffsetsSemaphore = new Semaphore(maxPendingOffsets);
        this.inflightSemaphore = new AdjustableSemaphore(inflightQueueSize);
        metrics.subscriptions().registerPendingOffsetsGauge(subscriptionName, (Object)this.maxPendingOffsetsSemaphore, slots -> ((double)maxPendingOffsets - (double)slots.availablePermits()) / (double)maxPendingOffsets);
    }

    public void setInflightSize(int inflightQueueSize) {
        this.inflightSemaphore.setMaxPermits(inflightQueueSize);
    }

    public void markAsProcessed(SubscriptionPartitionOffset subscriptionPartitionOffset) {
        this.inflightSemaphore.release();
        this.slots.put(subscriptionPartitionOffset, MessageState.PROCESSED);
    }

    public boolean tryAcquireSlot(Duration processingInterval) throws InterruptedException {
        if (this.inflightSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
            if (this.maxPendingOffsetsSemaphore.tryAcquire(processingInterval.toMillis(), TimeUnit.MILLISECONDS)) {
                return true;
            }
            this.inflightSemaphore.release();
        }
        return false;
    }

    public void markAsInflight(SubscriptionPartitionOffset subscriptionPartitionOffset) {
        this.slots.put(subscriptionPartitionOffset, MessageState.INFLIGHT);
    }

    public Map<SubscriptionPartitionOffset, MessageState> getOffsetsSnapshotAndReleaseProcessedSlots() {
        int permitsReleased = 0;
        HashMap<SubscriptionPartitionOffset, MessageState> offsetSnapshot = new HashMap<SubscriptionPartitionOffset, MessageState>();
        for (Map.Entry<SubscriptionPartitionOffset, MessageState> entry : this.slots.entrySet()) {
            offsetSnapshot.put(entry.getKey(), entry.getValue());
            if (entry.getValue() != MessageState.PROCESSED) continue;
            this.slots.remove(entry.getKey());
            ++permitsReleased;
        }
        this.maxPendingOffsetsSemaphore.release(permitsReleased);
        return offsetSnapshot;
    }

    public void releaseSlot() {
        this.inflightSemaphore.release();
        this.maxPendingOffsetsSemaphore.release();
    }
}

