package org.reactivecommons.async.kafka.health;

import java.beans.ConstructorProperties;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivecommons.async.starter.config.health.RCHealth;
import org.reactivecommons.async.starter.config.health.RCHealthIndicator;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/reactivecommons/async/kafka/health/KafkaReactiveHealthIndicator.class */
public class KafkaReactiveHealthIndicator extends RCHealthIndicator {

    @Generated
    private static final Logger log = LogManager.getLogger(KafkaReactiveHealthIndicator.class);
    private final String domain;
    private final AdminClient adminClient;

    public Mono<RCHealth> doHealthCheck(RCHealth.RCHealthBuilder rCHealthBuilder) {
        rCHealthBuilder.withDetail("domain", this.domain);
        return checkKafkaHealth().map(str -> {
            return rCHealthBuilder.up().withDetail("version", str).build();
        }).onErrorReturn(rCHealthBuilder.down().build());
    }

    private Mono<String> checkKafkaHealth() {
        return Mono.fromFuture(this.adminClient.describeCluster().clusterId().toCompletionStage().toCompletableFuture()).doOnError(th -> {
            log.error("Error checking Kafka health in domain {}", this.domain, th);
        });
    }

    @Generated
    @ConstructorProperties({"domain", "adminClient"})
    public KafkaReactiveHealthIndicator(String str, AdminClient adminClient) {
        this.domain = str;
        this.adminClient = adminClient;
    }
}
