/*
 * Decompiled with CFR 0.152.
 */
package no.nav.common.kafka.consumer.util;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import no.nav.common.kafka.consumer.KafkaConsumerClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FeatureToggledKafkaConsumerClient
implements KafkaConsumerClient {
    private static final Logger log = LoggerFactory.getLogger(FeatureToggledKafkaConsumerClient.class);
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMinutes(1L);
    private final KafkaConsumerClient kafkaConsumerClient;
    private final Duration pollTimeoutDuration;
    private final Supplier<Boolean> isConsumerToggledOffSupplier;
    private ScheduledExecutorService executorService;

    public FeatureToggledKafkaConsumerClient(KafkaConsumerClient kafkaConsumerClient, Supplier<Boolean> isToggledOffSupplier) {
        this.kafkaConsumerClient = kafkaConsumerClient;
        this.isConsumerToggledOffSupplier = isToggledOffSupplier;
        this.pollTimeoutDuration = DEFAULT_POLL_TIMEOUT;
    }

    public FeatureToggledKafkaConsumerClient(KafkaConsumerClient kafkaConsumerClient, Supplier<Boolean> isToggledOffSupplier, Duration pollTimeoutDuration) {
        this.kafkaConsumerClient = kafkaConsumerClient;
        this.isConsumerToggledOffSupplier = isToggledOffSupplier;
        this.pollTimeoutDuration = pollTimeoutDuration;
    }

    @Override
    public void start() {
        boolean isConsumerToggledOff = this.isConsumerToggledOffSupplier.get();
        if (!this.isRunning() && !isConsumerToggledOff) {
            this.kafkaConsumerClient.start();
        }
        this.shutAndAwaitExecutor();
        if (this.executorService == null || this.executorService.isShutdown()) {
            this.executorService = Executors.newSingleThreadScheduledExecutor();
            this.executorService.scheduleAtFixedRate(this::syncRunningStateWithToggle, this.pollTimeoutDuration.toMillis(), this.pollTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void stop() {
        this.shutAndAwaitExecutor();
        if (this.isRunning()) {
            this.kafkaConsumerClient.stop();
        }
    }

    @Override
    public boolean isRunning() {
        return this.kafkaConsumerClient.isRunning();
    }

    private void syncRunningStateWithToggle() {
        boolean isConsumerToggledOff = this.isConsumerToggledOffSupplier.get();
        boolean isRunning = this.isRunning();
        if (isConsumerToggledOff && isRunning) {
            log.info("Stopping consumer... Toggle for stopping consumers is on and kafka consumer client is running");
            this.kafkaConsumerClient.stop();
        } else if (!isConsumerToggledOff && !isRunning) {
            log.info("Starting consumer... Toggle for stopping consumers is off and kafka consumer client is not running");
            this.kafkaConsumerClient.start();
        }
    }

    private void shutAndAwaitExecutor() {
        if (this.executorService != null && !this.executorService.isShutdown()) {
            this.executorService.shutdown();
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }
}

