package com.github.charithe.kafka;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/charithe/kafka/KafkaJunitRule.class */
public class KafkaJunitRule extends ExternalResource {
    private static final int POLL_TIMEOUT_MS = 1000;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJunitRule.class);
    private static final int ALLOCATE_RANDOM_PORT = -1;
    private static final String LOCALHOST = "localhost";
    private Properties brokerProperties;
    private TestingServer zookeeper;
    private KafkaServerStartable kafkaServer;
    private int zookeeperPort;
    private String zookeeperConnectionString;
    private int kafkaPort;
    private Path kafkaLogDir;

    public KafkaJunitRule() {
        this(ALLOCATE_RANDOM_PORT);
    }

    public KafkaJunitRule(int i) {
        this.brokerProperties = null;
        this.zookeeperPort = ALLOCATE_RANDOM_PORT;
        if (i == ALLOCATE_RANDOM_PORT) {
            this.kafkaPort = InstanceSpec.getRandomPort();
        } else {
            this.kafkaPort = i;
        }
    }

    public KafkaJunitRule(int i, int i2) {
        this(i);
        this.zookeeperPort = i2;
    }

    public KafkaJunitRule(int i, int i2, Properties properties) {
        this(i, i2);
        this.brokerProperties = properties;
    }

    protected void before() throws Throwable {
        if (this.zookeeperPort == ALLOCATE_RANDOM_PORT) {
            this.zookeeper = new TestingServer(true);
            this.zookeeperPort = this.zookeeper.getPort();
        } else {
            this.zookeeper = new TestingServer(this.zookeeperPort, true);
        }
        this.zookeeperConnectionString = this.zookeeper.getConnectString();
        KafkaConfig buildKafkaConfig = buildKafkaConfig(this.zookeeperConnectionString);
        LOGGER.info("Starting Kafka server with config: {}", buildKafkaConfig.props());
        this.kafkaServer = new KafkaServerStartable(buildKafkaConfig);
        startKafka();
    }

    protected void after() {
        try {
            shutdownKafka();
            if (this.zookeeper != null) {
                LOGGER.info("Shutting down Zookeeper");
                this.zookeeper.close();
            }
            if (Files.exists(this.kafkaLogDir, new LinkOption[0])) {
                LOGGER.info("Deleting the log dir:  {}", this.kafkaLogDir);
                Files.walkFileTree(this.kafkaLogDir, new SimpleFileVisitor<Path>() { // from class: com.github.charithe.kafka.KafkaJunitRule.1
                    @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                    public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
                        Files.deleteIfExists(path);
                        return FileVisitResult.CONTINUE;
                    }

                    @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                    public FileVisitResult postVisitDirectory(Path path, IOException iOException) throws IOException {
                        Files.deleteIfExists(path);
                        return FileVisitResult.CONTINUE;
                    }
                });
            }
        } catch (Exception e) {
            LOGGER.error("Failed to clean-up Kafka", e);
        }
    }

    public void shutdownKafka() {
        if (this.kafkaServer != null) {
            LOGGER.info("Shutting down Kafka Server");
            this.kafkaServer.shutdown();
        }
    }

    public void startKafka() {
        if (this.kafkaServer != null) {
            LOGGER.info("Starting Kafka Server");
            this.kafkaServer.startup();
        }
    }

    private KafkaConfig buildKafkaConfig(String str) throws IOException {
        this.kafkaLogDir = Files.createTempDirectory("kafka_junit", new FileAttribute[0]);
        Properties properties = new Properties();
        properties.put("advertised.host.name", LOCALHOST);
        properties.put("port", this.kafkaPort + "");
        properties.put("broker.id", "1");
        properties.put("log.dirs", this.kafkaLogDir.toAbsolutePath().toString());
        properties.put("zookeeper.connect", str);
        properties.put("leader.imbalance.check.interval.seconds", "1");
        properties.put("offsets.topic.num.partitions", "1");
        properties.put("offsets.topic.replication.factor", "1");
        properties.put("default.replication.factor", "1");
        properties.put("group.min.session.timeout.ms", "100");
        if (this.brokerProperties != null) {
            properties.putAll(this.brokerProperties);
        }
        return new KafkaConfig(properties);
    }

    public Properties producerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        properties.put("acks", "1");
        properties.put("request.timeout.ms", "500");
        return properties;
    }

    public Properties consumerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:" + this.kafkaPort);
        properties.put("group.id", "kafka-junit-consumer");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "200");
        properties.put("auto.offset.reset", "earliest");
        properties.put("heartbeat.interval.ms", "100");
        properties.put("session.timeout.ms", "200");
        properties.put("fetch.max.wait.ms", "200");
        return properties;
    }

    public KafkaConsumer<String, String> createStringConsumer() {
        return createConsumer(new StringDeserializer(), new StringDeserializer());
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new KafkaConsumer<>(consumerConfig(), deserializer, deserializer2);
    }

    public KafkaProducer<String, String> createStringProducer() {
        return createProducer(new StringSerializer(), new StringSerializer());
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2) {
        return new KafkaProducer<>(producerConfig(), serializer, serializer2);
    }

    public List<String> readStringMessages(String str, int i, int i2) throws TimeoutException {
        return readMessages(createStringConsumer(), str, i, i2);
    }

    public <T> List<T> readMessages(KafkaConsumer<T, T> kafkaConsumer, String str, int i, int i2) throws TimeoutException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            try {
                kafkaConsumer.subscribe(Lists.newArrayList(new String[]{str}));
                List<T> list = (List) newSingleThreadExecutor.submit(() -> {
                    ArrayList arrayList = new ArrayList(i);
                    while (arrayList.size() < i) {
                        Iterator it = kafkaConsumer.poll(1000L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            LOGGER.debug("Received message: {} -> {}", consumerRecord.key(), consumerRecord.value());
                            arrayList.add(consumerRecord.value());
                        }
                    }
                    return arrayList;
                }).get(i2, TimeUnit.SECONDS);
                newSingleThreadExecutor.shutdown();
                return list;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new TimeoutException("Timed out waiting for messages");
            } catch (Exception e2) {
                throw new RuntimeException("Unexpected exception while reading messages", e2);
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    public Path kafkaLogDir() {
        return this.kafkaLogDir;
    }

    public int kafkaBrokerPort() {
        return this.kafkaPort;
    }

    public int zookeeperPort() {
        return this.zookeeperPort;
    }

    public String zookeeperConnectionString() {
        return this.zookeeperConnectionString;
    }
}
