package io.simplesource.kafka.util;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;

/* loaded from: input_file:io/simplesource/kafka/util/KafkaStreamsUtils.class */
public final class KafkaStreamsUtils {
    public static void registerExceptionHandler(Logger logger, KafkaStreams kafkaStreams) {
        kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            logger.error("Unhandled exception in " + thread.getName() + ", exiting. {}", kafkaStreams, th);
            System.exit(1);
        });
    }

    public static void addShutdownHook(Logger logger, KafkaStreams kafkaStreams) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Kafka Streams [{}] is shutting down", kafkaStreams);
            kafkaStreams.close(15L, TimeUnit.SECONDS);
        }));
    }

    public static void waitUntilStable(Logger logger, KafkaStreams kafkaStreams) {
        boolean z = false;
        do {
            logger.info("KafkaStreams now in state {} waiting until RUNNING for 5 seconds", kafkaStreams.state());
            while (!Objects.equals(kafkaStreams.state(), KafkaStreams.State.RUNNING)) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    logger.warn("wait was interrupted", e);
                }
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (Objects.equals(kafkaStreams.state(), KafkaStreams.State.RUNNING) && System.currentTimeMillis() - currentTimeMillis < 5000) {
                Thread.sleep(1000L);
            }
            if (Objects.equals(kafkaStreams.state(), KafkaStreams.State.RUNNING)) {
                z = true;
            }
        } while (!z);
        logger.info("Streams app stable for 5 seconds. Considered up.");
    }

    public static void waitForShutdown(Logger logger, KafkaStreams kafkaStreams) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(countDownLatch);
        runtime.addShutdownHook(new Thread(countDownLatch::countDown));
        try {
            countDownLatch.await();
        } catch (Exception e) {
            logger.warn("Error waiting for shutdown: {}", e);
            System.exit(1);
        }
    }
}
