/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.util;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import no.nav.common.kafka.consumer.KafkaConsumerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ToggledKafkaConsumerClient
implements KafkaConsumerClient {
    private static final Logger log = LoggerFactory.getLogger(ToggledKafkaConsumerClient.class);
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMinutes(1L);
    private final KafkaConsumerClient kafkaConsumerClient;
    private final Duration pollTimeoutDuration;
    private final Supplier<Boolean> isToggledOnSupplier;
    private ScheduledExecutorService executorService;

    public ToggledKafkaConsumerClient(KafkaConsumerClient kafkaConsumerClient, Supplier<Boolean> isToggledOnSupplier) {
        this.kafkaConsumerClient = kafkaConsumerClient;
        this.isToggledOnSupplier = isToggledOnSupplier;
        this.pollTimeoutDuration = DEFAULT_POLL_TIMEOUT;
    }

    public ToggledKafkaConsumerClient(KafkaConsumerClient kafkaConsumerClient, Supplier<Boolean> isToggledOnSupplier, Duration pollTimeoutDuration) {
        this.kafkaConsumerClient = kafkaConsumerClient;
        this.isToggledOnSupplier = isToggledOnSupplier;
        this.pollTimeoutDuration = pollTimeoutDuration;
    }

    @Override
    public void start() {
        if (!this.isRunning()) {
            this.kafkaConsumerClient.start();
        }
        if (this.executorService != null && !this.executorService.isShutdown()) {
            this.executorService.shutdown();
        }
        if (this.executorService == null || this.executorService.isShutdown()) {
            this.executorService = Executors.newSingleThreadScheduledExecutor();
            this.executorService.scheduleAtFixedRate(this::syncRunningStateWithToggle, this.pollTimeoutDuration.toMillis(), this.pollTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void stop() {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.isRunning()) {
            this.kafkaConsumerClient.stop();
        }
    }

    @Override
    public boolean isRunning() {
        return this.kafkaConsumerClient.isRunning();
    }

    private void syncRunningStateWithToggle() {
        boolean isToggledOn = this.isToggledOnSupplier.get();
        boolean isRunning = this.isRunning();
        if (isToggledOn && !isRunning) {
            log.info("Starting consumer... Toggle is on and kafka consumer client is not running");
            this.kafkaConsumerClient.start();
        } else if (!isToggledOn && isRunning) {
            log.info("Stopping consumer... Toggle is off and kafka consumer client is running");
            this.kafkaConsumerClient.stop();
        }
    }
}

