package dev.lydtech.component.framework.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/lydtech/component/framework/kafka/KafkaAdminClient.class */
public final class KafkaAdminClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
    private static String kafkaBrokerUrl;
    private static KafkaAdminClient instance;

    private KafkaAdminClient() {
    }

    public static synchronized KafkaAdminClient getInstance() {
        if (instance == null) {
            kafkaBrokerUrl = "http://" + ((String) Optional.ofNullable(System.getProperty("kafka.host")).orElse("localhost")) + ":" + ((String) Optional.ofNullable(System.getProperty("kafka.mapped.port")).orElseThrow(() -> {
                return new RuntimeException("kafka.mapped.port property not found");
            }));
            log.info("Kafka broker URL is: " + kafkaBrokerUrl);
            instance = new KafkaAdminClient();
        }
        return instance;
    }

    public Consumer createConsumer(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaBrokerUrl);
        properties.put("group.id", str + "-" + str2);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("auto.offset.reset", "latest");
        properties.put("metadata.max.age.ms", 1000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(str2));
        return kafkaConsumer;
    }

    public Producer<Long, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaBrokerUrl);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        return new KafkaProducer(properties);
    }

    public void sendMessage(String str, String str2, String str3) throws Exception {
        ProducerRecord producerRecord = new ProducerRecord(str, (Integer) null, str2, str3);
        RecordMetadata recordMetadata = (RecordMetadata) createProducer().send(producerRecord).get();
        log.debug(String.format("Sent record(key=%s value=%s) meta(topic=%s, partition=%d, offset=%d)", producerRecord.key(), producerRecord.value(), recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())));
    }

    public List<ConsumerRecord<String, String>> consumeAndAssert(String str, Consumer consumer, int i, int i2) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger(-1);
        AtomicInteger atomicInteger3 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            consumer.poll(Duration.ofMillis(100L)).forEach(consumerRecord -> {
                log.info(str + " - received: " + ((String) consumerRecord.value()));
                atomicInteger.incrementAndGet();
                arrayList.add(consumerRecord);
            });
            if (atomicInteger.get() == i) {
                atomicInteger2.incrementAndGet();
            }
            atomicInteger3.getAndIncrement();
            log.info(str + " - poll count: " + atomicInteger3.get() + " - received count: " + atomicInteger.get());
            return Boolean.valueOf(atomicInteger.get() == i && atomicInteger2.get() == i2);
        });
        return arrayList;
    }
}
