/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver;

import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;
import pl.allegro.tech.hermes.metrics.HermesTimer;
import pl.allegro.tech.hermes.metrics.HermesTimerContext;

public class ThrottlingMessageReceiver
implements MessageReceiver {
    private final MessageReceiver receiver;
    private final IdleTimeCalculator idleTimeCalculator;
    private final HermesTimer idleTimer;

    public ThrottlingMessageReceiver(MessageReceiver receiver, IdleTimeCalculator idleTimeCalculator, SubscriptionName subscriptionName, MetricsFacade metrics) {
        this.receiver = receiver;
        this.idleTimeCalculator = idleTimeCalculator;
        this.idleTimer = metrics.subscriptions().consumerIdleTimer(subscriptionName);
    }

    @Override
    public Optional<Message> next() {
        Optional<Message> next = this.receiver.next();
        if (next.isPresent()) {
            this.idleTimeCalculator.reset();
        } else {
            this.awaitUntilNextPoll();
        }
        return next;
    }

    private void awaitUntilNextPoll() {
        try (HermesTimerContext ignored = this.idleTimer.time();){
            Thread.sleep(this.idleTimeCalculator.increaseIdleTime());
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void commit(Set<SubscriptionPartitionOffset> offsets) {
        this.receiver.commit(offsets);
    }

    @Override
    public boolean moveOffset(PartitionOffset offset) {
        return this.receiver.moveOffset(offset);
    }

    @Override
    public void stop() {
        this.receiver.stop();
    }

    @Override
    public void update(Subscription newSubscription) {
        this.receiver.update(newSubscription);
    }
}

