package io.streamthoughts.kafka.tests;

import io.streamthoughts.kafka.tests.TestingEmbeddedKafka;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TypeCastException;
import kotlin.collections.ArraysKt;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.io.CloseableKt;
import kotlin.jdk7.AutoCloseableKt;
import kotlin.jvm.JvmOverloads;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: TestingEmbeddedKafka.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��\u008a\u0001\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010%\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0011\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010$\n��\n\u0002\u0018\u0002\n\u0002\b\u0007\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\"\n\u0002\b\u0006\u0018�� <2\u00020\u0001:\u0001<B\u000f\u0012\b\b\u0002\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u000e\u0010\b\u001a\n \n*\u0004\u0018\u00010\t0\tJ\u001d\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\r0\f2\n\b\u0002\u0010\u000e\u001a\u0004\u0018\u00010\u000f¢\u0006\u0002\u0010\u0010J\b\u0010\u0011\u001a\u00020\u0012H\u0002J|\u0010\u0013\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u0002H\u0016\u0012\u0004\u0012\u0002H\u00170\u00150\u0014\"\u0004\b��\u0010\u0016\"\u0004\b\u0001\u0010\u00172\u0006\u0010\u0018\u001a\u00020\r2\b\b\u0002\u0010\u0019\u001a\u00020\u001a2\b\b\u0002\u0010\u001b\u001a\u00020\u001c2\u0010\b\u0002\u0010\u001d\u001a\n\u0012\u0004\u0012\u0002H\u0016\u0018\u00010\u001e2\u0010\b\u0002\u0010\u001f\u001a\n\u0012\u0004\u0012\u0002H\u0017\u0018\u00010\u001e2\u0016\b\u0002\u0010 \u001a\u0010\u0012\u0004\u0012\u00020\r\u0012\u0006\u0012\u0004\u0018\u00010\u00010!JZ\u0010\"\u001a\u000e\u0012\u0004\u0012\u0002H\u0016\u0012\u0004\u0012\u0002H\u00170#\"\u0004\b��\u0010\u0016\"\u0004\b\u0001\u0010\u00172\u0016\b\u0002\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\r\u0012\u0006\u0012\u0004\u0018\u00010\u00010!2\u0010\b\u0002\u0010\u001d\u001a\n\u0012\u0004\u0012\u0002H\u0016\u0018\u00010\u001e2\u0010\b\u0002\u0010\u001f\u001a\n\u0012\u0004\u0012\u0002H\u0017\u0018\u00010\u001eJ@\u0010$\u001a\u00020\u00122\u0006\u0010\u0018\u001a\u00020\r2\b\b\u0002\u0010%\u001a\u00020\u001c2\b\b\u0002\u0010&\u001a\u00020\u001c2\u001a\b\u0002\u0010\u0002\u001a\u0014\u0012\u0006\u0012\u0004\u0018\u00010\r\u0012\u0006\u0012\u0004\u0018\u00010\r\u0018\u00010!H\u0007J\u001f\u0010'\u001a\u00020\u00122\u0012\u0010(\u001a\n\u0012\u0006\b\u0001\u0012\u00020\r0\f\"\u00020\r¢\u0006\u0002\u0010)J7\u0010*\u001a\u00020+2\f\u0010,\u001a\b\u0012\u0004\u0012\u00020\r0\f2\b\b\u0002\u0010-\u001a\u00020\u001a2\b\b\u0002\u0010.\u001a\u00020/2\u0006\u0010\b\u001a\u00020\tH\u0002¢\u0006\u0002\u00100J\b\u00101\u001a\u00020\rH\u0002J*\u00102\u001a\u000e\u0012\u0004\u0012\u00020\u0001\u0012\u0004\u0012\u00020\u0001032\u0016\b\u0002\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\r\u0012\u0006\u0012\u0004\u0018\u00010\u00010!J\u001c\u00104\u001a\u00020\u00122\u0014\b\u0002\u00105\u001a\u000e\u0012\u0004\u0012\u00020\u0001\u0012\u0004\u0012\u00020\u00010!J\u0006\u00106\u001a\u00020\u0012J\f\u0010,\u001a\b\u0012\u0004\u0012\u00020\r07J)\u00108\u001a\u00020+2\u0012\u0010(\u001a\n\u0012\u0006\b\u0001\u0012\u00020\r0\f\"\u00020\r2\b\b\u0002\u0010\u0019\u001a\u00020\u001a¢\u0006\u0002\u00109J)\u0010:\u001a\u00020+2\u0012\u0010(\u001a\n\u0012\u0006\b\u0001\u0012\u00020\r0\f\"\u00020\r2\b\b\u0002\u0010\u0019\u001a\u00020\u001a¢\u0006\u0002\u00109J\b\u0010;\u001a\u00020\rH\u0002R\u001a\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0001\u0012\u0004\u0012\u00020\u00010\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082.¢\u0006\u0002\n��¨\u0006="}, d2 = {"Lio/streamthoughts/kafka/tests/TestingEmbeddedKafka;", "", "config", "Ljava/util/Properties;", "(Ljava/util/Properties;)V", "", "kafka", "Lkafka/server/KafkaServer;", "adminClient", "Lorg/apache/kafka/clients/admin/AdminClient;", "kotlin.jvm.PlatformType", "bootstrapServers", "", "", "securityProtocol", "Lorg/apache/kafka/common/security/auth/SecurityProtocol;", "(Lorg/apache/kafka/common/security/auth/SecurityProtocol;)[Ljava/lang/String;", "clearLogsDir", "", "consumeUntilMinRecordsOrTimeout", "", "Lorg/apache/kafka/clients/consumer/ConsumerRecord;", "K", "V", "topic", "timeout", "Ljava/time/Duration;", "expectedNumRecords", "", "keyDeserializer", "Lorg/apache/kafka/common/serialization/Deserializer;", "valueDeserializer", "consumerConfig", "", "consumerClient", "Lorg/apache/kafka/clients/consumer/Consumer;", "createTopic", "partitions", "replication", "deleteTopics", "topicNames", "([Ljava/lang/String;)V", "doWaitForTopicsToBeAbsent", "", "topics", "until", "now", "", "([Ljava/lang/String;Ljava/time/Duration;JLorg/apache/kafka/clients/admin/AdminClient;)Z", "logDir", "producerClient", "Lorg/apache/kafka/clients/producer/Producer;", "start", "overrides", "stop", "", "waitForTopicsToBeAbsent", "([Ljava/lang/String;Ljava/time/Duration;)Z", "waitForTopicsToBePresent", "zookeeperConnect", "Companion", "kafka-clients-kotlin-tests"})
/* loaded from: input_file:io/streamthoughts/kafka/tests/TestingEmbeddedKafka.class */
public final class TestingEmbeddedKafka {
    private final Map<Object, Object> config;
    private KafkaServer kafka;

    @NotNull
    private static final Logger Log;
    public static final Companion Companion = new Companion(null);

    /* compiled from: TestingEmbeddedKafka.kt */
    @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��<\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010#\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002J\u0016\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\b2\u0006\u0010\n\u001a\u00020\u000bH\u0002J(\u0010\f\u001a\u00020\r2\u0006\u0010\u000e\u001a\u00020\u000f2\b\b\u0002\u0010\u0010\u001a\u00020\u00112\f\u0010\u0012\u001a\b\u0012\u0004\u0012\u00020\r0\u0013H\u0002R\u0011\u0010\u0003\u001a\u00020\u0004¢\u0006\b\n��\u001a\u0004\b\u0005\u0010\u0006¨\u0006\u0014"}, d2 = {"Lio/streamthoughts/kafka/tests/TestingEmbeddedKafka$Companion;", "", "()V", "Log", "Lorg/slf4j/Logger;", "getLog", "()Lorg/slf4j/Logger;", "listTopicNames", "", "", "adminClient", "Lorg/apache/kafka/clients/admin/Admin;", "waitForTrue", "", "timeout", "Ljava/time/Duration;", "time", "", "action", "Lkotlin/Function0;", "kafka-clients-kotlin-tests"})
    /* loaded from: input_file:io/streamthoughts/kafka/tests/TestingEmbeddedKafka$Companion.class */
    public static final class Companion {
        @NotNull
        public final Logger getLog() {
            return TestingEmbeddedKafka.Log;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final Set<String> listTopicNames(Admin admin) {
            try {
                Object obj = admin.listTopics().names().get();
                Intrinsics.checkExpressionValueIsNotNull(obj, "adminClient.listTopics().names().get()");
                return (Set) obj;
            } catch (Exception e) {
                throw new RuntimeException("Failed to get topic names", e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final boolean waitForTrue(Duration duration, long j, Function0<Boolean> function0) {
            boolean z;
            long millis = duration.toMillis();
            boolean z2 = false;
            while (true) {
                z = z2;
                if (System.currentTimeMillis() - j >= millis || z) {
                    break;
                }
                z2 = ((Boolean) function0.invoke()).booleanValue();
            }
            return z;
        }

        static /* synthetic */ boolean waitForTrue$default(Companion companion, Duration duration, long j, Function0 function0, int i, Object obj) {
            if ((i & 2) != 0) {
                j = System.currentTimeMillis();
            }
            return companion.waitForTrue(duration, j, function0);
        }

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public final String[] bootstrapServers(@Nullable SecurityProtocol securityProtocol) {
        int boundPort;
        if (securityProtocol == null) {
            KafkaServer kafkaServer = this.kafka;
            if (kafkaServer == null) {
                Intrinsics.throwUninitializedPropertyAccessException("kafka");
            }
            ListenerName listenerName = ((EndPoint) kafkaServer.config().advertisedListeners().apply(0)).listenerName();
            KafkaServer kafkaServer2 = this.kafka;
            if (kafkaServer2 == null) {
                Intrinsics.throwUninitializedPropertyAccessException("kafka");
            }
            boundPort = kafkaServer2.boundPort(listenerName);
        } else {
            KafkaServer kafkaServer3 = this.kafka;
            if (kafkaServer3 == null) {
                Intrinsics.throwUninitializedPropertyAccessException("kafka");
            }
            boundPort = kafkaServer3.boundPort(new ListenerName(securityProtocol.toString()));
        }
        int i = boundPort;
        String[] strArr = new String[1];
        StringBuilder sb = new StringBuilder();
        KafkaServer kafkaServer4 = this.kafka;
        if (kafkaServer4 == null) {
            Intrinsics.throwUninitializedPropertyAccessException("kafka");
        }
        strArr[0] = sb.append(kafkaServer4.config().hostName()).append(':').append(i).toString();
        return strArr;
    }

    public static /* synthetic */ String[] bootstrapServers$default(TestingEmbeddedKafka testingEmbeddedKafka, SecurityProtocol securityProtocol, int i, Object obj) {
        if ((i & 1) != 0) {
            securityProtocol = (SecurityProtocol) null;
        }
        return testingEmbeddedKafka.bootstrapServers(securityProtocol);
    }

    public final void start(@NotNull Map<Object, ? extends Object> map) {
        Intrinsics.checkParameterIsNotNull(map, "overrides");
        this.config.putAll(map);
        this.config.putIfAbsent(KafkaConfig.LogDirProp(), "/tmp/kafka-logs");
        this.config.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), true);
        this.config.putIfAbsent(KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2097152L);
        this.config.putIfAbsent(KafkaConfig.GroupMinSessionTimeoutMsProp(), 0);
        this.config.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
        this.config.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), Short.valueOf((short) 1));
        this.config.putIfAbsent(KafkaConfig.OffsetsTopicPartitionsProp(), 5);
        this.config.putIfAbsent(KafkaConfig.TransactionsTopicPartitionsProp(), 5);
        this.config.putIfAbsent(KafkaConfig.AutoCreateTopicsEnableProp(), true);
        KafkaConfig kafkaConfig = new KafkaConfig(this.config, true);
        Log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", logDir(), zookeeperConnect());
        KafkaServer createServer = TestUtils.createServer(kafkaConfig, new SystemTime());
        Intrinsics.checkExpressionValueIsNotNull(createServer, "TestUtils.createServer(kafkaConfig, SystemTime())");
        this.kafka = createServer;
        Log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", bootstrapServers$default(this, null, 1, null), zookeeperConnect());
    }

    public static /* synthetic */ void start$default(TestingEmbeddedKafka testingEmbeddedKafka, Map map, int i, Object obj) {
        if ((i & 1) != 0) {
            map = MapsKt.emptyMap();
        }
        testingEmbeddedKafka.start(map);
    }

    public final void stop() {
        Log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", bootstrapServers$default(this, null, 1, null), zookeeperConnect());
        KafkaServer kafkaServer = this.kafka;
        if (kafkaServer == null) {
            Intrinsics.throwUninitializedPropertyAccessException("kafka");
        }
        kafkaServer.shutdown();
        KafkaServer kafkaServer2 = this.kafka;
        if (kafkaServer2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException("kafka");
        }
        kafkaServer2.awaitShutdown();
        clearLogsDir();
        Log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", bootstrapServers$default(this, null, 1, null), zookeeperConnect());
    }

    private final void clearLogsDir() {
        Log.debug("Deleting logs.dir at {} ...", logDir());
        Files.walk(Paths.get(logDir(), new String[0]), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).forEach(new Consumer<Path>() { // from class: io.streamthoughts.kafka.tests.TestingEmbeddedKafka$clearLogsDir$1
            @Override // java.util.function.Consumer
            public final void accept(Path path) {
                String logDir;
                try {
                    Files.delete(path);
                } catch (IOException e) {
                    Logger log = TestingEmbeddedKafka.Companion.getLog();
                    logDir = TestingEmbeddedKafka.this.logDir();
                    log.error("Failed to delete entry in log dir {}", logDir, e);
                }
            }
        });
    }

    @JvmOverloads
    public final void createTopic(@NotNull String str, int i, int i2, @Nullable Map<String, String> map) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        Log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), map});
        AdminClient adminClient = (AutoCloseable) adminClient();
        Throwable th = (Throwable) null;
        try {
            AdminClient adminClient2 = adminClient;
            try {
                NewTopic newTopic = new NewTopic(str, i, (short) i2);
                newTopic.configs(map);
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause != null) {
                    throw cause;
                }
                throw new TypeCastException("null cannot be cast to non-null type kotlin.Throwable");
            }
        } finally {
            AutoCloseableKt.closeFinally(adminClient, th);
        }
    }

    public static /* synthetic */ void createTopic$default(TestingEmbeddedKafka testingEmbeddedKafka, String str, int i, int i2, Map map, int i3, Object obj) {
        if ((i3 & 2) != 0) {
            i = 1;
        }
        if ((i3 & 4) != 0) {
            i2 = 1;
        }
        if ((i3 & 8) != 0) {
            map = MapsKt.emptyMap();
        }
        testingEmbeddedKafka.createTopic(str, i, i2, map);
    }

    @JvmOverloads
    public final void createTopic(@NotNull String str, int i, int i2) {
        createTopic$default(this, str, i, i2, null, 8, null);
    }

    @JvmOverloads
    public final void createTopic(@NotNull String str, int i) {
        createTopic$default(this, str, i, 0, null, 12, null);
    }

    @JvmOverloads
    public final void createTopic(@NotNull String str) {
        createTopic$default(this, str, 0, 0, null, 14, null);
    }

    @NotNull
    public final Set<String> topics() {
        Admin admin = (AutoCloseable) adminClient();
        Throwable th = (Throwable) null;
        try {
            try {
                Admin admin2 = (AdminClient) admin;
                Companion companion = Companion;
                Intrinsics.checkExpressionValueIsNotNull(admin2, "adminClient");
                Set<String> listTopicNames = companion.listTopicNames(admin2);
                AutoCloseableKt.closeFinally(admin, th);
                return listTopicNames;
            } finally {
            }
        } catch (Throwable th2) {
            AutoCloseableKt.closeFinally(admin, th);
            throw th2;
        }
    }

    public final boolean waitForTopicsToBePresent(@NotNull String[] strArr, @NotNull final Duration duration) {
        Intrinsics.checkParameterIsNotNull(strArr, "topicNames");
        Intrinsics.checkParameterIsNotNull(duration, "timeout");
        final long currentTimeMillis = System.currentTimeMillis();
        final List mutableListOf = CollectionsKt.mutableListOf((String[]) Arrays.copyOf(strArr, strArr.length));
        AdminClient adminClient = (AutoCloseable) adminClient();
        Throwable th = (Throwable) null;
        try {
            try {
                final AdminClient adminClient2 = adminClient;
                boolean waitForTrue = Companion.waitForTrue(duration, currentTimeMillis, new Function0<Boolean>() { // from class: io.streamthoughts.kafka.tests.TestingEmbeddedKafka$waitForTopicsToBePresent$$inlined$use$lambda$1
                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }

                    public /* bridge */ /* synthetic */ Object invoke() {
                        return Boolean.valueOf(m1invoke());
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public final boolean m1invoke() {
                        TestingEmbeddedKafka.Companion companion = TestingEmbeddedKafka.Companion;
                        Admin admin = adminClient2;
                        Intrinsics.checkExpressionValueIsNotNull(admin, "client");
                        return companion.listTopicNames(admin).containsAll(mutableListOf);
                    }
                });
                AutoCloseableKt.closeFinally(adminClient, th);
                return waitForTrue;
            } finally {
            }
        } catch (Throwable th2) {
            AutoCloseableKt.closeFinally(adminClient, th);
            throw th2;
        }
    }

    public static /* synthetic */ boolean waitForTopicsToBePresent$default(TestingEmbeddedKafka testingEmbeddedKafka, String[] strArr, Duration duration, int i, Object obj) {
        if ((i & 2) != 0) {
            Duration ofSeconds = Duration.ofSeconds(30L);
            Intrinsics.checkExpressionValueIsNotNull(ofSeconds, "Duration.ofSeconds(30)");
            duration = ofSeconds;
        }
        return testingEmbeddedKafka.waitForTopicsToBePresent(strArr, duration);
    }

    public final boolean waitForTopicsToBeAbsent(@NotNull String[] strArr, @NotNull Duration duration) {
        Intrinsics.checkParameterIsNotNull(strArr, "topicNames");
        Intrinsics.checkParameterIsNotNull(duration, "timeout");
        AdminClient adminClient = (AutoCloseable) adminClient();
        Throwable th = (Throwable) null;
        try {
            try {
                AdminClient adminClient2 = adminClient;
                String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length);
                Intrinsics.checkExpressionValueIsNotNull(adminClient2, "it");
                boolean doWaitForTopicsToBeAbsent$default = doWaitForTopicsToBeAbsent$default(this, strArr2, duration, 0L, adminClient2, 4, null);
                AutoCloseableKt.closeFinally(adminClient, th);
                return doWaitForTopicsToBeAbsent$default;
            } finally {
            }
        } catch (Throwable th2) {
            AutoCloseableKt.closeFinally(adminClient, th);
            throw th2;
        }
    }

    public static /* synthetic */ boolean waitForTopicsToBeAbsent$default(TestingEmbeddedKafka testingEmbeddedKafka, String[] strArr, Duration duration, int i, Object obj) {
        if ((i & 2) != 0) {
            Duration ofSeconds = Duration.ofSeconds(30L);
            Intrinsics.checkExpressionValueIsNotNull(ofSeconds, "Duration.ofSeconds(30)");
            duration = ofSeconds;
        }
        return testingEmbeddedKafka.waitForTopicsToBeAbsent(strArr, duration);
    }

    private final boolean doWaitForTopicsToBeAbsent(String[] strArr, Duration duration, long j, final AdminClient adminClient) {
        final List mutableListOf = CollectionsKt.mutableListOf((String[]) Arrays.copyOf(strArr, strArr.length));
        return Companion.waitForTrue(duration, j, new Function0<Boolean>() { // from class: io.streamthoughts.kafka.tests.TestingEmbeddedKafka$doWaitForTopicsToBeAbsent$1
            public /* bridge */ /* synthetic */ Object invoke() {
                return Boolean.valueOf(m2invoke());
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public final boolean m2invoke() {
                mutableListOf.retainAll(TestingEmbeddedKafka.Companion.listTopicNames(adminClient));
                return mutableListOf.isEmpty();
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }
        });
    }

    static /* synthetic */ boolean doWaitForTopicsToBeAbsent$default(TestingEmbeddedKafka testingEmbeddedKafka, String[] strArr, Duration duration, long j, AdminClient adminClient, int i, Object obj) {
        if ((i & 2) != 0) {
            Duration ofMillis = Duration.ofMillis(Long.MAX_VALUE);
            Intrinsics.checkExpressionValueIsNotNull(ofMillis, "Duration.ofMillis(Long.MAX_VALUE)");
            duration = ofMillis;
        }
        if ((i & 4) != 0) {
            j = System.currentTimeMillis();
        }
        return testingEmbeddedKafka.doWaitForTopicsToBeAbsent(strArr, duration, j, adminClient);
    }

    public final AdminClient adminClient() {
        return AdminClient.create(MapsKt.mutableMapOf(new Pair[]{new Pair("bootstrap.servers", ArraysKt.joinToString$default(bootstrapServers$default(this, null, 1, null), (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null)), new Pair("request.timeout.ms", 60000)}));
    }

    @NotNull
    public final Producer<Object, Object> producerClient(@NotNull Map<String, ? extends Object> map) {
        Intrinsics.checkParameterIsNotNull(map, "config");
        HashMap hashMap = new HashMap(map);
        hashMap.put("bootstrap.servers", ArraysKt.joinToString$default(bootstrapServers$default(this, null, 1, null), (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null));
        return new KafkaProducer<>(hashMap);
    }

    public static /* synthetic */ Producer producerClient$default(TestingEmbeddedKafka testingEmbeddedKafka, Map map, int i, Object obj) {
        if ((i & 1) != 0) {
            map = MapsKt.emptyMap();
        }
        return testingEmbeddedKafka.producerClient(map);
    }

    @NotNull
    public final <K, V> org.apache.kafka.clients.consumer.Consumer<K, V> consumerClient(@NotNull Map<String, ? extends Object> map, @Nullable Deserializer<K> deserializer, @Nullable Deserializer<V> deserializer2) {
        Intrinsics.checkParameterIsNotNull(map, "config");
        HashMap hashMap = new HashMap(map);
        hashMap.put("bootstrap.servers", ArraysKt.joinToString$default(bootstrapServers$default(this, null, 1, null), (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null));
        hashMap.putIfAbsent("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.putIfAbsent("value.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.putIfAbsent("auto.offset.reset", "earliest");
        hashMap.putIfAbsent("enable.auto.commit", "true");
        hashMap.putIfAbsent("group.id", UUID.randomUUID().toString());
        return new KafkaConsumer<>(hashMap, deserializer, deserializer2);
    }

    public static /* synthetic */ org.apache.kafka.clients.consumer.Consumer consumerClient$default(TestingEmbeddedKafka testingEmbeddedKafka, Map map, Deserializer deserializer, Deserializer deserializer2, int i, Object obj) {
        if ((i & 1) != 0) {
            map = MapsKt.emptyMap();
        }
        if ((i & 2) != 0) {
            deserializer = (Deserializer) null;
        }
        if ((i & 4) != 0) {
            deserializer2 = (Deserializer) null;
        }
        return testingEmbeddedKafka.consumerClient(map, deserializer, deserializer2);
    }

    @NotNull
    public final <K, V> List<ConsumerRecord<K, V>> consumeUntilMinRecordsOrTimeout(@NotNull String str, @NotNull Duration duration, int i, @Nullable Deserializer<K> deserializer, @Nullable Deserializer<V> deserializer2, @NotNull Map<String, ? extends Object> map) {
        Intrinsics.checkParameterIsNotNull(str, "topic");
        Intrinsics.checkParameterIsNotNull(duration, "timeout");
        Intrinsics.checkParameterIsNotNull(map, "consumerConfig");
        org.apache.kafka.clients.consumer.Consumer consumer = (Closeable) consumerClient(map, deserializer, deserializer2);
        Throwable th = (Throwable) null;
        try {
            try {
                org.apache.kafka.clients.consumer.Consumer consumer2 = consumer;
                consumer2.subscribe(CollectionsKt.listOf(str));
                ArrayList arrayList = new ArrayList();
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < duration.toMillis() && arrayList.size() < i) {
                    Iterable<ConsumerRecord> poll = consumer2.poll(Duration.ofMillis(100L));
                    Intrinsics.checkExpressionValueIsNotNull(poll, "client.poll(Duration.ofMillis(100))");
                    for (ConsumerRecord consumerRecord : poll) {
                        Intrinsics.checkExpressionValueIsNotNull(consumerRecord, "it");
                        arrayList.add(consumerRecord);
                    }
                }
                CloseableKt.closeFinally(consumer, th);
                return arrayList;
            } finally {
            }
        } catch (Throwable th2) {
            CloseableKt.closeFinally(consumer, th);
            throw th2;
        }
    }

    public static /* synthetic */ List consumeUntilMinRecordsOrTimeout$default(TestingEmbeddedKafka testingEmbeddedKafka, String str, Duration duration, int i, Deserializer deserializer, Deserializer deserializer2, Map map, int i2, Object obj) {
        if ((i2 & 2) != 0) {
            Duration ofMinutes = Duration.ofMinutes(1L);
            Intrinsics.checkExpressionValueIsNotNull(ofMinutes, "Duration.ofMinutes(1)");
            duration = ofMinutes;
        }
        if ((i2 & 4) != 0) {
            i = Integer.MAX_VALUE;
        }
        if ((i2 & 8) != 0) {
            deserializer = (Deserializer) null;
        }
        if ((i2 & 16) != 0) {
            deserializer2 = (Deserializer) null;
        }
        if ((i2 & 32) != 0) {
            map = MapsKt.emptyMap();
        }
        return testingEmbeddedKafka.consumeUntilMinRecordsOrTimeout(str, duration, i, deserializer, deserializer2, map);
    }

    public final void deleteTopics(@NotNull String... strArr) {
        Intrinsics.checkParameterIsNotNull(strArr, "topicNames");
        List mutableListOf = CollectionsKt.mutableListOf((String[]) Arrays.copyOf(strArr, strArr.length));
        try {
            AdminClient adminClient = (AutoCloseable) adminClient();
            Throwable th = (Throwable) null;
            try {
                try {
                    AdminClient adminClient2 = adminClient;
                    adminClient2.deleteTopics(mutableListOf).all().get();
                    String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length);
                    Intrinsics.checkExpressionValueIsNotNull(adminClient2, "client");
                    doWaitForTopicsToBeAbsent$default(this, strArr2, null, 0L, adminClient2, 6, null);
                    AutoCloseableKt.closeFinally(adminClient, th);
                } finally {
                }
            } catch (Throwable th2) {
                AutoCloseableKt.closeFinally(adminClient, th);
                throw th2;
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to delete topics: " + mutableListOf, e);
        }
    }

    private final String zookeeperConnect() {
        return String.valueOf(this.config.get(KafkaConfig.ZkConnectProp()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final String logDir() {
        return String.valueOf(this.config.get(KafkaConfig.LogDirProp()));
    }

    public TestingEmbeddedKafka(@NotNull Properties properties) {
        Intrinsics.checkParameterIsNotNull(properties, "config");
        this.config = new HashMap(properties);
    }

    public /* synthetic */ TestingEmbeddedKafka(Properties properties, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this((i & 1) != 0 ? new Properties() : properties);
    }

    public TestingEmbeddedKafka() {
        this(null, 1, null);
    }

    static {
        Logger logger = LoggerFactory.getLogger(TestingEmbeddedKafka.class);
        Intrinsics.checkExpressionValueIsNotNull(logger, "LoggerFactory.getLogger(…mbeddedKafka::class.java)");
        Log = logger;
    }
}
