package io.confluent.parallelconsumer.integrationTests.utils;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.state.ModelUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import one.util.streamex.IntStreamEx;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.class */
public class KafkaClientUtils implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClientUtils.class);
    public static final int MAX_POLL_RECORDS = 10000;
    public static final String GROUP_ID_PREFIX = "group-1-";
    private final KafkaContainer kContainer;
    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private AdminClient admin;
    private KafkaConsumer<String, String> lastConsumerConstructed;
    private OffsetResetStrategy offsetResetPolicy = OffsetResetStrategy.EARLIEST;
    private String groupId = GROUP_ID_PREFIX + RandomUtils.nextInt();

    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils$GroupOption.class */
    public enum GroupOption {
        REUSE_GROUP,
        NEW_GROUP
    }

    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils$PCVersion.class */
    class PCVersion {
        public static final String V051 = "0.5.1";

        PCVersion() {
        }
    }

    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils$ProducerMode.class */
    public enum ProducerMode {
        TRANSACTIONAL,
        NOT_TRANSACTIONAL;

        public static ProducerMode matching(ParallelConsumerOptions.CommitMode commitMode) {
            return commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER) ? TRANSACTIONAL : NOT_TRANSACTIONAL;
        }
    }

    public KafkaClientUtils(KafkaContainer kafkaContainer) {
        kafkaContainer.addEnv("KAFKA_transaction_state_log_replication_factor", "1");
        kafkaContainer.addEnv("KAFKA_transaction_state_log_min_isr", "1");
        kafkaContainer.start();
        this.kContainer = kafkaContainer;
    }

    private Properties setupCommonProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kContainer.getBootstrapServers());
        return properties;
    }

    private Properties setupProducerProps() {
        Properties properties = setupCommonProps();
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return properties;
    }

    private Properties setupConsumerProps(String str) {
        Properties properties = setupCommonProps();
        properties.put("group.id", str);
        properties.put("enable.auto.commit", false);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        properties.put("auto.offset.reset", this.offsetResetPolicy.name().toLowerCase());
        properties.put("max.poll.records", Integer.valueOf(MAX_POLL_RECORDS));
        return properties;
    }

    @BeforeEach
    public void open() {
        log.info("Setting up clients...");
        this.consumer = createNewConsumer();
        this.producer = createNewProducer(false);
        this.admin = AdminClient.create(setupCommonProps());
    }

    @Override // java.lang.AutoCloseable
    @AfterEach
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.admin != null) {
            this.admin.close();
        }
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(String str) {
        return createNewConsumer(str, new Properties());
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(GroupOption groupOption) {
        return createNewConsumer(groupOption.equals(GroupOption.NEW_GROUP));
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer() {
        return createNewConsumer(false);
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean z) {
        return createNewConsumer(z, new Properties());
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(Properties properties) {
        return createNewConsumer(false, properties);
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean z, Properties properties) {
        if (z) {
            this.groupId = GROUP_ID_PREFIX + RandomUtils.nextInt();
        }
        return createNewConsumer(this.groupId, properties);
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(String str, Properties properties) {
        Properties properties2 = setupConsumerProps(str);
        properties2.putAll(properties);
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties2);
        log.debug("New consume {}", kafkaConsumer);
        return kafkaConsumer;
    }

    public <K, V> KafkaProducer<K, V> createAndInitNewTransactionalProducer() {
        KafkaProducer<K, V> createNewProducer = createNewProducer(ProducerMode.TRANSACTIONAL);
        createNewProducer.initTransactions();
        return createNewProducer;
    }

    @Deprecated
    public <K, V> KafkaProducer<K, V> createNewProducer(boolean z) {
        return createNewProducer(z ? ProducerMode.TRANSACTIONAL : ProducerMode.NOT_TRANSACTIONAL);
    }

    public KafkaProducer<String, String> createNewProducer(ParallelConsumerOptions.CommitMode commitMode) {
        return createNewProducer(ProducerMode.matching(commitMode));
    }

    public <K, V> KafkaProducer<K, V> createNewProducer(ProducerMode producerMode) {
        Properties properties = setupProducerProps();
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        if (producerMode.equals(ProducerMode.TRANSACTIONAL)) {
            properties2.put("transactional.id", getClass().getSimpleName() + ":" + RandomUtils.nextInt());
            properties2.put("transaction.timeout.ms", Integer.valueOf((int) Duration.ofSeconds(10L).toMillis()));
        }
        KafkaProducer<K, V> kafkaProducer = new KafkaProducer<>(properties2);
        log.debug("New producer {}", kafkaProducer);
        return kafkaProducer;
    }

    public List<NewTopic> createTopics(int i) {
        List<NewTopic> list = IntStreamEx.range(i).mapToObj(i2 -> {
            return new NewTopic("in-" + i2 + "-" + RandomUtils.nextInt(), Optional.empty(), Optional.empty());
        }).toList();
        getAdmin().createTopics(list).all().get();
        return list;
    }

    public List<String> produceMessages(String str, long j) throws InterruptedException, ExecutionException {
        return produceMessages(str, j, "");
    }

    public List<String> produceMessages(String str, long j, String str2) throws InterruptedException, ExecutionException {
        log.info("Producing {} messages to {}", Long.valueOf(j), str);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer createNewProducer = createNewProducer(false);
        try {
            for (ProducerRecord<String, String> producerRecord : new ModelUtils(new PCModuleTestEnv()).createProducerRecords(str, j, str2)) {
                arrayList2.add(createNewProducer.send(producerRecord, (recordMetadata, exc) -> {
                    if (exc != null) {
                        log.error("Error sending, ", exc);
                    }
                }));
                arrayList.add((String) producerRecord.key());
            }
            log.debug("Finished sending test data");
            if (createNewProducer != null) {
                createNewProducer.close();
            }
            log.debug("Waiting for broker acks");
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((RecordMetadata) ((Future) it.next()).get()).hasOffset()).isTrue();
            }
            Assertions.assertThat(arrayList2).hasSize(Math.toIntExact(j));
            return arrayList;
        } catch (Throwable th) {
            if (createNewProducer != null) {
                try {
                    createNewProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder processingOrder, ParallelConsumerOptions.CommitMode commitMode, int i) {
        return buildPc(processingOrder, commitMode, i, GroupOption.REUSE_GROUP);
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder processingOrder, ParallelConsumerOptions.CommitMode commitMode, int i, GroupOption groupOption) {
        Properties properties = new Properties();
        properties.put("max.poll.records", Integer.valueOf(i));
        KafkaConsumer<String, String> createNewConsumer = createNewConsumer(groupOption.equals(GroupOption.NEW_GROUP), properties);
        this.lastConsumerConstructed = createNewConsumer;
        ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().ordering(processingOrder).consumer(createNewConsumer).commitMode(commitMode).maxConcurrency(100).build());
        parallelEoSStreamProcessor.setTimeBetweenCommits(Duration.ofSeconds(1L));
        return parallelEoSStreamProcessor;
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder processingOrder, GroupOption groupOption) {
        return buildPc(processingOrder, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, 500, groupOption);
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        return buildPc(processingOrder, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, 500);
    }

    public KafkaConsumer<String, String> getLastConsumerConstructed() {
        return this.lastConsumerConstructed;
    }

    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    public void setOffsetResetPolicy(OffsetResetStrategy offsetResetStrategy) {
        this.offsetResetPolicy = offsetResetStrategy;
    }

    public KafkaProducer<String, String> getProducer() {
        return this.producer;
    }

    public AdminClient getAdmin() {
        return this.admin;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }
}
