package io.confluent.parallelconsumer.integrationTests.utils;

import java.time.Duration;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.class */
public class KafkaClientUtils {
    private static final Logger log = LoggerFactory.getLogger(KafkaClientUtils.class);
    public static final int MAX_POLL_RECORDS = 10000;
    public static final String GROUP_ID_PREFIX = "group-1-";
    private final KafkaContainer kContainer;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private AdminClient admin;
    private final String groupId = GROUP_ID_PREFIX + RandomUtils.nextInt();

    public KafkaClientUtils(KafkaContainer kafkaContainer) {
        kafkaContainer.addEnv("KAFKA_transaction_state_log_replication_factor", "1");
        kafkaContainer.addEnv("KAFKA_transaction_state_log_min_isr", "1");
        kafkaContainer.start();
        this.kContainer = kafkaContainer;
    }

    private Properties setupCommonProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kContainer.getBootstrapServers());
        return properties;
    }

    private Properties setupProducerProps() {
        Properties properties = setupCommonProps();
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return properties;
    }

    private Properties setupConsumerProps() {
        Properties properties = setupCommonProps();
        properties.put("group.id", this.groupId);
        properties.put("auto.offset.reset", OffsetResetStrategy.EARLIEST.name().toLowerCase());
        properties.put("enable.auto.commit", false);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        return properties;
    }

    @BeforeEach
    public void open() {
        log.info("Setting up clients...");
        this.consumer = createNewConsumer();
        this.producer = createNewProducer(false);
        this.admin = AdminClient.create(setupCommonProps());
    }

    @AfterEach
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.admin != null) {
            this.admin.close();
        }
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer() {
        return createNewConsumer(false);
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean z) {
        return createNewConsumer(z, new Properties());
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(Properties properties) {
        return createNewConsumer(false, properties);
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean z, Properties properties) {
        Properties properties2 = setupConsumerProps();
        if (z) {
            properties2.put("group.id", GROUP_ID_PREFIX + RandomUtils.nextInt());
        }
        properties2.putAll(properties);
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties2);
        log.debug("New consume {}", kafkaConsumer);
        return kafkaConsumer;
    }

    public <K, V> KafkaProducer<K, V> createNewProducer(boolean z) {
        Properties properties = setupProducerProps();
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        if (z) {
            properties2.put("transactional.id", getClass().getSimpleName() + ":" + RandomUtils.nextInt());
            properties2.put("transaction.timeout.ms", Integer.valueOf((int) Duration.ofSeconds(10L).toMillis()));
        }
        KafkaProducer<K, V> kafkaProducer = new KafkaProducer<>(properties2);
        log.debug("New producer {}", kafkaProducer);
        return kafkaProducer;
    }

    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    public KafkaProducer<String, String> getProducer() {
        return this.producer;
    }

    public AdminClient getAdmin() {
        return this.admin;
    }
}
