/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests.utils;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.state.ModelUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import one.util.streamex.IntStreamEx;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

public class KafkaClientUtils
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClientUtils.class);
    public static final int MAX_POLL_RECORDS = 10000;
    public static final String GROUP_ID_PREFIX = "group-1-";
    private final KafkaContainer kContainer;
    private KafkaConsumer<String, String> consumer;
    private OffsetResetStrategy offsetResetPolicy = OffsetResetStrategy.EARLIEST;
    private KafkaProducer<String, String> producer;
    private AdminClient admin;
    private String groupId = "group-1-" + RandomUtils.nextInt();
    private KafkaConsumer<String, String> lastConsumerConstructed;

    public KafkaClientUtils(KafkaContainer kafkaContainer) {
        kafkaContainer.addEnv("KAFKA_transaction_state_log_replication_factor", "1");
        kafkaContainer.addEnv("KAFKA_transaction_state_log_min_isr", "1");
        kafkaContainer.start();
        this.kContainer = kafkaContainer;
    }

    private Properties setupCommonProps() {
        Properties commonProps = new Properties();
        String servers = this.kContainer.getBootstrapServers();
        commonProps.put("bootstrap.servers", servers);
        return commonProps;
    }

    private Properties setupProducerProps() {
        Properties producerProps = this.setupCommonProps();
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        return producerProps;
    }

    public Properties setupConsumerProps(String groupIdToUse) {
        Properties consumerProps = this.setupCommonProps();
        consumerProps.put("group.id", groupIdToUse);
        consumerProps.put("enable.auto.commit", (Object)false);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        consumerProps.put("isolation.level", IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        consumerProps.put("auto.offset.reset", this.offsetResetPolicy.name().toLowerCase());
        consumerProps.put("max.poll.records", (Object)10000);
        return consumerProps;
    }

    @BeforeEach
    public void open() {
        log.info("Setting up clients...");
        this.consumer = this.createNewConsumer();
        this.producer = this.createNewProducer(false);
        this.admin = AdminClient.create((Properties)this.setupCommonProps());
    }

    @Override
    @AfterEach
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.admin != null) {
            this.admin.close();
        }
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(String groupId) {
        return this.createNewConsumer(groupId, new Properties());
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(GroupOption reuseGroup) {
        return this.createNewConsumer(reuseGroup.equals((Object)GroupOption.NEW_GROUP));
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer() {
        return this.createNewConsumer(false);
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup) {
        return this.createNewConsumer(newConsumerGroup, new Properties());
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(Properties options) {
        return this.createNewConsumer(false, options);
    }

    public <K, V> KafkaConsumer<K, V> createNewConsumer(boolean newConsumerGroup, Properties options) {
        if (newConsumerGroup) {
            String newGroupId;
            this.groupId = newGroupId = GROUP_ID_PREFIX + RandomUtils.nextInt();
        }
        return this.createNewConsumer(this.groupId, options);
    }

    @Deprecated
    public <K, V> KafkaConsumer<K, V> createNewConsumer(String groupId, Properties options) {
        Properties properties = this.setupConsumerProps(groupId);
        properties.putAll((Map<?, ?>)options);
        KafkaConsumer kvKafkaConsumer = new KafkaConsumer(properties);
        log.debug("New consume {}", (Object)kvKafkaConsumer);
        return kvKafkaConsumer;
    }

    public <K, V> KafkaProducer<K, V> createAndInitNewTransactionalProducer() {
        KafkaProducer<K, V> txProd = this.createNewProducer(ProducerMode.TRANSACTIONAL);
        txProd.initTransactions();
        return txProd;
    }

    @Deprecated
    public <K, V> KafkaProducer<K, V> createNewProducer(boolean transactional) {
        ProducerMode mode = transactional ? ProducerMode.TRANSACTIONAL : ProducerMode.NOT_TRANSACTIONAL;
        return this.createNewProducer(mode);
    }

    public KafkaProducer<String, String> createNewProducer(ParallelConsumerOptions.CommitMode commitMode) {
        return this.createNewProducer(ProducerMode.matching(commitMode));
    }

    public <K, V> KafkaProducer<K, V> createNewProducer(ProducerMode mode) {
        Properties properties = this.setupProducerProps();
        Properties txProps = new Properties();
        txProps.putAll((Map<?, ?>)properties);
        if (mode.equals((Object)ProducerMode.TRANSACTIONAL)) {
            txProps.put("transactional.id", this.getClass().getSimpleName() + ":" + RandomUtils.nextInt());
            txProps.put("transaction.timeout.ms", (Object)((int)Duration.ofSeconds(10L).toMillis()));
        }
        KafkaProducer kvKafkaProducer = new KafkaProducer(txProps);
        log.debug("New producer {}", (Object)kvKafkaProducer);
        return kvKafkaProducer;
    }

    public List<NewTopic> createTopics(int numTopics) {
        List newTopics = IntStreamEx.range((int)numTopics).mapToObj(i -> new NewTopic("in-" + i + "-" + RandomUtils.nextInt(), Optional.empty(), Optional.empty())).toList();
        this.getAdmin().createTopics((Collection)newTopics).all().get();
        return newTopics;
    }

    public List<String> produceMessages(String topicName, long numberToSend) throws InterruptedException, ExecutionException {
        return this.produceMessages(topicName, numberToSend, "");
    }

    public List<String> produceMessages(String topicName, long numberToSend, String prefix) throws InterruptedException, ExecutionException {
        log.info("Producing {} messages to {}", (Object)numberToSend, (Object)topicName);
        ArrayList<String> expectedKeys = new ArrayList<String>();
        ArrayList<Future> sends = new ArrayList<Future>();
        try (KafkaProducer kafkaProducer = this.createNewProducer(false);){
            ModelUtils mu = new ModelUtils(new PCModuleTestEnv());
            List<ProducerRecord<String, String>> recs = mu.createProducerRecords(topicName, numberToSend, prefix);
            for (ProducerRecord<String, String> record : recs) {
                Future send = kafkaProducer.send(record, (meta, exception) -> {
                    if (exception != null) {
                        log.error("Error sending, ", (Throwable)exception);
                    }
                });
                sends.add(send);
                expectedKeys.add((String)record.key());
            }
            log.debug("Finished sending test data");
        }
        log.debug("Waiting for broker acks");
        for (Future send : sends) {
            RecordMetadata recordMetadata = (RecordMetadata)send.get();
            boolean b = recordMetadata.hasOffset();
            Assertions.assertThat((boolean)b).isTrue();
        }
        Assertions.assertThat(sends).hasSize(Math.toIntExact(numberToSend));
        return expectedKeys;
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder order, ParallelConsumerOptions.CommitMode commitMode, int maxPoll) {
        return this.buildPc(order, commitMode, maxPoll, GroupOption.REUSE_GROUP);
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder order, ParallelConsumerOptions.CommitMode commitMode, int maxPoll, GroupOption groupOption) {
        Properties consumerProps = new Properties();
        consumerProps.put("max.poll.records", (Object)maxPoll);
        boolean newConsumerGroup = groupOption.equals((Object)GroupOption.NEW_GROUP);
        KafkaConsumer newConsumer = this.createNewConsumer(newConsumerGroup, consumerProps);
        this.lastConsumerConstructed = newConsumer;
        ParallelEoSStreamProcessor pc = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().ordering(order).consumer(newConsumer).commitMode(commitMode).maxConcurrency(100).build());
        pc.setTimeBetweenCommits(Duration.ofSeconds(1L));
        return pc;
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder key, GroupOption groupOption) {
        return this.buildPc(key, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, 500, groupOption);
    }

    public ParallelEoSStreamProcessor<String, String> buildPc(ParallelConsumerOptions.ProcessingOrder key) {
        return this.buildPc(key, ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, 500);
    }

    public KafkaConsumer<String, String> getLastConsumerConstructed() {
        return this.lastConsumerConstructed;
    }

    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    public void setOffsetResetPolicy(OffsetResetStrategy offsetResetPolicy) {
        this.offsetResetPolicy = offsetResetPolicy;
    }

    public KafkaProducer<String, String> getProducer() {
        return this.producer;
    }

    public AdminClient getAdmin() {
        return this.admin;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public static enum GroupOption {
        REUSE_GROUP,
        NEW_GROUP;

    }

    public static enum ProducerMode {
        TRANSACTIONAL,
        NOT_TRANSACTIONAL;


        public static ProducerMode matching(ParallelConsumerOptions.CommitMode commitMode) {
            return commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER) ? TRANSACTIONAL : NOT_TRANSACTIONAL;
        }
    }

    class PCVersion {
        public static final String V051 = "0.5.1";

        PCVersion() {
        }
    }
}

