/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.testcontainers.FilteredTestContainerSlf4jLogConsumer;
import io.confluent.parallelconsumer.integrationTests.LoadTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import pl.tlinkowski.unij.api.UniLists;

@Testcontainers
public abstract class BrokerIntegrationTest<K, V> {
    private static final Logger log = LoggerFactory.getLogger(BrokerIntegrationTest.class);
    int numPartitions = 1;
    int partitionNumber = 0;
    String topic;
    public static KafkaContainer kafkaContainer;
    private final KafkaClientUtils kcu = new KafkaClientUtils(kafkaContainer);

    public static KafkaContainer createKafkaContainer(String logSegmentSize) {
        KafkaContainer base = (KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:7.3.0")).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1")).withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "500")).withReuse(true);
        if (StringUtils.isNotBlank((CharSequence)logSegmentSize)) {
            base = (KafkaContainer)base.withEnv("KAFKA_LOG_SEGMENT_BYTES", logSegmentSize);
        }
        return base;
    }

    @BeforeAll
    static void followKafkaLogs() {
        if (log.isDebugEnabled()) {
            FilteredTestContainerSlf4jLogConsumer logConsumer = new FilteredTestContainerSlf4jLogConsumer(log);
            kafkaContainer.followOutput((Consumer)((Object)logConsumer));
        }
    }

    @BeforeEach
    void open() {
        this.kcu.open();
    }

    @AfterEach
    void close() {
        this.kcu.close();
    }

    protected void setupTopic() {
        String name = LoadTest.class.getSimpleName();
        this.setupTopic(name);
    }

    protected String setupTopic(String name) {
        Assertions.assertThat((boolean)kafkaContainer.isRunning()).isTrue();
        this.topic = name + "-" + RandomUtils.nextInt();
        this.ensureTopic(this.topic, this.numPartitions);
        return this.topic;
    }

    protected CreateTopicsResult ensureTopic(String topic, int numPartitions) {
        NewTopic e1 = new NewTopic(topic, numPartitions, 1);
        CreateTopicsResult topics = this.kcu.getAdmin().createTopics((Collection)UniLists.of((Object)e1));
        try {
            Void void_ = (Void)topics.all().get(1L, TimeUnit.SECONDS);
        }
        catch (ExecutionException executionException) {
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return topics;
    }

    protected List<String> produceMessages(int quantity) {
        return this.produceMessages(quantity, "");
    }

    protected List<String> produceMessages(int quantity, String prefix) {
        return this.getKcu().produceMessages(this.getTopic(), quantity, prefix);
    }

    public String getTopic() {
        return this.topic;
    }

    protected KafkaClientUtils getKcu() {
        return this.kcu;
    }

    static {
        System.setProperty("flogger.backend_factory", "com.google.common.flogger.backend.slf4j.Slf4jBackendFactory#getInstance");
        kafkaContainer = BrokerIntegrationTest.createKafkaContainer(null);
        kafkaContainer.start();
    }
}

