package org.mandas.kafka;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:org/mandas/kafka/KafkaCluster.class */
public class KafkaCluster {
    private final Zk zk;
    private final Map<Integer, KafkaBroker> brokers;
    private final Path baseLogPath;
    private FileVisitor<Path> recursiveDeleteVisitor;

    /* loaded from: input_file:org/mandas/kafka/KafkaCluster$KafkaClusterBuilder.class */
    public static class KafkaClusterBuilder {
        private Zk zk;
        private Map<Integer, KafkaBroker> brokers = new HashMap();
        private final Path base;

        KafkaClusterBuilder(Path path) {
            this.base = path;
        }

        public KafkaClusterBuilder withZookeeper(String str, int i) {
            return withZookeeper(str, i, 10);
        }

        public KafkaClusterBuilder withZookeeper(String str, int i, int i2) {
            this.zk = new Zk(this.base.resolve("zk").resolve("data"), this.base.resolve("zk").resolve("log"), str, i, i2);
            return this;
        }

        public KafkaClusterBuilder withBroker(int i, String str, int i2) {
            if (this.zk == null) {
                throw new IllegalStateException("A Kafka broker needs a Zookeeper connection");
            }
            this.brokers.put(Integer.valueOf(i), new KafkaBroker(this.base.resolve("kafka").resolve(String.valueOf(i)).resolve("log"), i, this.zk.getConnectionString(), str, i2));
            return this;
        }

        public KafkaClusterBuilder withBroker(int i, String str, int i2, Map<String, Object> map) {
            if (this.zk == null) {
                throw new IllegalStateException("A Kafka broker needs a Zookeeper connection");
            }
            this.brokers.put(Integer.valueOf(i), new KafkaBroker(this.base.resolve("kafka").resolve(String.valueOf(i)).resolve("log"), i, this.zk.getConnectionString(), str, i2, map));
            return this;
        }

        public KafkaCluster build() {
            if (this.zk == null) {
                throw new IllegalStateException("You cannot build a KafkaCluster without a Zookeeper server");
            }
            if (this.brokers.isEmpty()) {
                throw new IllegalStateException("You cannot build a KafkaCluster without Kafka brokers");
            }
            return new KafkaCluster(this.zk, this.brokers, this.base);
        }
    }

    public static KafkaClusterBuilder builder() {
        return new KafkaClusterBuilder(Paths.get(System.getProperty("java.io.tmpdir"), new String[0]).resolve("kafka-cluster-unit").resolve(String.valueOf(new Random().nextInt())));
    }

    public String brokers() {
        return (String) this.brokers.values().stream().map((v0) -> {
            return v0.getListener();
        }).collect(Collectors.joining(","));
    }

    private Properties defaultProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers());
        return properties;
    }

    public <K, V> Consumer<K, V> consumer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(defaultProperties());
        properties2.putAll(properties);
        return new KafkaConsumer(properties2);
    }

    public <K, V> Consumer<K, V> consumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        Properties properties2 = new Properties();
        properties2.putAll(defaultProperties());
        properties2.putAll(properties);
        return new KafkaConsumer(properties2, deserializer, deserializer2);
    }

    public AdminClient adminClient(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(defaultProperties());
        properties2.putAll(properties);
        return AdminClient.create(properties2);
    }

    public AdminClient adminClient() {
        return AdminClient.create(defaultProperties());
    }

    public <K, V> Producer<K, V> producer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(defaultProperties());
        properties2.putAll(properties);
        return new KafkaProducer(properties2);
    }

    public <K, V> Producer<K, V> producer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        Properties properties2 = new Properties();
        properties2.putAll(defaultProperties());
        properties2.putAll(properties);
        return new KafkaProducer(properties2, serializer, serializer2);
    }

    public void stop(int i) {
        KafkaBroker kafkaBroker = this.brokers.get(Integer.valueOf(i));
        if (kafkaBroker == null) {
            throw new IllegalArgumentException(String.format("Broker with id %d does not exist", kafkaBroker));
        }
        kafkaBroker.stop();
    }

    public void shutdown() {
        Iterator<KafkaBroker> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.zk.stop();
        try {
            Files.walkFileTree(this.baseLogPath, this.recursiveDeleteVisitor);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void start() {
        try {
            this.zk.start();
            Iterator<KafkaBroker> it = this.brokers.values().iterator();
            while (it.hasNext()) {
                it.next().start(this.brokers.values().size());
            }
            initialize();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initialize() {
        StringDeserializer stringDeserializer = new StringDeserializer();
        Properties properties = new Properties();
        properties.put("group.id", "group");
        Consumer consumer = consumer(properties, stringDeserializer, stringDeserializer);
        Throwable th = null;
        try {
            try {
                consumer.subscribe(Arrays.asList("__consumer_offsets"));
                consumer.poll(0L);
                if (consumer != null) {
                    if (0 == 0) {
                        consumer.close();
                        return;
                    }
                    try {
                        consumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (consumer != null) {
                if (th != null) {
                    try {
                        consumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    consumer.close();
                }
            }
            throw th4;
        }
    }

    public void createTopic(String str, int i, int i2) {
        try {
            AdminClient adminClient = adminClient();
            Throwable th = null;
            try {
                try {
                    adminClient.createTopics(Arrays.asList(new NewTopic(str, i, (short) i2))).all().get();
                    if (adminClient != null) {
                        if (0 != 0) {
                            try {
                                adminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            adminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    void deleteTopic(String str) {
        try {
            AdminClient adminClient = adminClient();
            Throwable th = null;
            try {
                try {
                    adminClient.deleteTopics(Arrays.asList(str)).all().get();
                    if (adminClient != null) {
                        if (0 != 0) {
                            try {
                                adminClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            adminClient.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void createTopic(String str, int i) {
        createTopic(str, i, this.brokers.size());
    }

    private KafkaCluster(Zk zk, Map<Integer, KafkaBroker> map, Path path) {
        this.recursiveDeleteVisitor = new SimpleFileVisitor<Path>() { // from class: org.mandas.kafka.KafkaCluster.1
            @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
            public FileVisitResult visitFile(Path path2, BasicFileAttributes basicFileAttributes) throws IOException {
                FileVisitResult visitFile = super.visitFile((AnonymousClass1) path2, basicFileAttributes);
                if (visitFile != FileVisitResult.CONTINUE) {
                    return visitFile;
                }
                Files.delete(path2);
                return FileVisitResult.CONTINUE;
            }

            @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
            public FileVisitResult postVisitDirectory(Path path2, IOException iOException) throws IOException {
                FileVisitResult postVisitDirectory = super.postVisitDirectory((AnonymousClass1) path2, iOException);
                if (postVisitDirectory != FileVisitResult.CONTINUE) {
                    return postVisitDirectory;
                }
                Files.delete(path2);
                return FileVisitResult.CONTINUE;
            }
        };
        this.zk = zk;
        this.brokers = map;
        this.baseLogPath = path;
    }
}
