package org.sdase.commons.spring.boot.kafka;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

@ConditionalOnEnabledHealthIndicator("kafka")
@Component("kafka")
/* loaded from: input_file:org/sdase/commons/spring/boot/kafka/KafkaHealthIndicator.class */
public class KafkaHealthIndicator extends AbstractHealthIndicator {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaHealthIndicator.class);
    private final AdminClient kafkaAdminClient;
    private final Duration kafkaCommandTimeout;

    public KafkaHealthIndicator(@Value("${management.health.kafka.timeout:4s}") Duration duration, KafkaAdmin kafkaAdmin) {
        super("Kafka health check operation failed");
        this.kafkaCommandTimeout = duration;
        this.kafkaAdminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
        LOG.info("Kafka health check is initialized with timeout duration {}", duration);
    }

    protected void doHealthCheck(Health.Builder builder) throws Exception {
        this.kafkaAdminClient.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf((int) TimeUnit.MILLISECONDS.convert(this.kafkaCommandTimeout)))).names().get();
        builder.up().withDetails(Map.of("info", "Kafka health check operation succeeded")).build();
    }
}
