package net.manub.embeddedkafka;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.ServerCnxnFactory;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.package$;
import scala.reflect.io.Directory;
import scala.reflect.io.Directory$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: EmbeddedKafka.scala */
/* loaded from: input_file:net/manub/embeddedkafka/EmbeddedKafka$.class */
public final class EmbeddedKafka$ implements EmbeddedKafkaSupport {
    public static EmbeddedKafka$ MODULE$;
    private Option<ServerCnxnFactory> factory;
    private Option<KafkaServer> broker;
    private final Buffer<Directory> logsDirs;
    private final ExecutorService net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService;
    private final ExecutionContextExecutorService net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext;
    private final int zkSessionTimeoutMs;
    private final int zkConnectionTimeoutMs;
    private final boolean zkSecurityEnabled;
    private volatile EmbeddedKafkaSupport$aKafkaProducer$ aKafkaProducer$module;

    static {
        new EmbeddedKafka$();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public Object withRunningKafka(Function0<Object> function0, EmbeddedKafkaConfig embeddedKafkaConfig) {
        return withRunningKafka(function0, embeddedKafkaConfig);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public void publishStringMessageToKafka(String str, String str2, EmbeddedKafkaConfig embeddedKafkaConfig) {
        publishStringMessageToKafka(str, str2, embeddedKafkaConfig);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public <T> void publishToKafka(String str, T t, EmbeddedKafkaConfig embeddedKafkaConfig, Serializer<T> serializer) throws KafkaUnavailableException {
        publishToKafka(str, t, embeddedKafkaConfig, serializer);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public <K, T> void publishToKafka(String str, K k, T t, EmbeddedKafkaConfig embeddedKafkaConfig, Serializer<K> serializer, Serializer<T> serializer2) throws KafkaUnavailableException {
        publishToKafka(str, k, t, embeddedKafkaConfig, serializer, serializer2);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public String consumeFirstStringMessageFrom(String str, boolean z, EmbeddedKafkaConfig embeddedKafkaConfig) {
        return consumeFirstStringMessageFrom(str, z, embeddedKafkaConfig);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public <T> T consumeFirstMessageFrom(String str, boolean z, EmbeddedKafkaConfig embeddedKafkaConfig, Deserializer<T> deserializer) throws TimeoutException, KafkaUnavailableException {
        return (T) consumeFirstMessageFrom(str, z, embeddedKafkaConfig, deserializer);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public ServerCnxnFactory startZooKeeper(int i, Directory directory) {
        return startZooKeeper(i, directory);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public KafkaServer startKafka(EmbeddedKafkaConfig embeddedKafkaConfig, Directory directory) {
        return startKafka(embeddedKafkaConfig, directory);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public void createCustomTopic(String str, Map<String, String> map, int i, int i2, EmbeddedKafkaConfig embeddedKafkaConfig) {
        createCustomTopic(str, map, i, i2, embeddedKafkaConfig);
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public boolean consumeFirstStringMessageFrom$default$2() {
        return consumeFirstStringMessageFrom$default$2();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public <T> boolean consumeFirstMessageFrom$default$2() {
        return consumeFirstMessageFrom$default$2();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public Map<String, String> createCustomTopic$default$2() {
        return createCustomTopic$default$2();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public int createCustomTopic$default$3() {
        return createCustomTopic$default$3();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public int createCustomTopic$default$4() {
        return createCustomTopic$default$4();
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public ExecutorService net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService() {
        return this.net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public ExecutionContextExecutorService net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext() {
        return this.net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public int zkSessionTimeoutMs() {
        return this.zkSessionTimeoutMs;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public int zkConnectionTimeoutMs() {
        return this.zkConnectionTimeoutMs;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public boolean zkSecurityEnabled() {
        return this.zkSecurityEnabled;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public EmbeddedKafkaSupport$aKafkaProducer$ aKafkaProducer() {
        if (this.aKafkaProducer$module == null) {
            aKafkaProducer$lzycompute$1();
        }
        return this.aKafkaProducer$module;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public final void net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService_$eq(ExecutorService executorService) {
        this.net$manub$embeddedkafka$EmbeddedKafkaSupport$$executorService = executorService;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public final void net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext_$eq(ExecutionContextExecutorService executionContextExecutorService) {
        this.net$manub$embeddedkafka$EmbeddedKafkaSupport$$executionContext = executionContextExecutorService;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public void net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSessionTimeoutMs_$eq(int i) {
        this.zkSessionTimeoutMs = i;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public void net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkConnectionTimeoutMs_$eq(int i) {
        this.zkConnectionTimeoutMs = i;
    }

    @Override // net.manub.embeddedkafka.EmbeddedKafkaSupport
    public void net$manub$embeddedkafka$EmbeddedKafkaSupport$_setter_$zkSecurityEnabled_$eq(boolean z) {
        this.zkSecurityEnabled = z;
    }

    public void start(EmbeddedKafkaConfig embeddedKafkaConfig) {
        Directory makeTemp = Directory$.MODULE$.makeTemp("zookeeper-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        Directory makeTemp2 = Directory$.MODULE$.makeTemp("kafka-logs", Directory$.MODULE$.makeTemp$default$2(), Directory$.MODULE$.makeTemp$default$3());
        this.factory = Option$.MODULE$.apply(startZooKeeper(embeddedKafkaConfig.zooKeeperPort(), makeTemp));
        this.broker = Option$.MODULE$.apply(startKafka(embeddedKafkaConfig, makeTemp2));
        this.logsDirs.$plus$plus$eq(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Directory[]{makeTemp, makeTemp2})));
    }

    public void startZooKeeper(Directory directory, EmbeddedKafkaConfig embeddedKafkaConfig) {
        this.factory = Option$.MODULE$.apply(startZooKeeper(embeddedKafkaConfig.zooKeeperPort(), directory));
    }

    public void startKafka(Directory directory, EmbeddedKafkaConfig embeddedKafkaConfig) {
        this.broker = Option$.MODULE$.apply(startKafka(embeddedKafkaConfig, directory));
    }

    public void stop() {
        stopKafka();
        stopZooKeeper();
        this.logsDirs.foreach(directory -> {
            return BoxesRunTime.boxToBoolean(directory.deleteRecursively());
        });
        this.logsDirs.clear();
    }

    public void stopZooKeeper() {
        this.factory.foreach(serverCnxnFactory -> {
            serverCnxnFactory.shutdown();
            return BoxedUnit.UNIT;
        });
        this.factory = None$.MODULE$;
    }

    public void stopKafka() {
        this.broker.foreach(kafkaServer -> {
            $anonfun$stopKafka$1(kafkaServer);
            return BoxedUnit.UNIT;
        });
        this.broker = None$.MODULE$;
    }

    public boolean isRunning() {
        return this.factory.nonEmpty() && this.broker.nonEmpty();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [net.manub.embeddedkafka.EmbeddedKafka$] */
    /* JADX WARN: Type inference failed for: r1v2, types: [net.manub.embeddedkafka.EmbeddedKafkaSupport$aKafkaProducer$] */
    private final void aKafkaProducer$lzycompute$1() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.aKafkaProducer$module == null) {
                r0 = this;
                r0.aKafkaProducer$module = new Object(this) { // from class: net.manub.embeddedkafka.EmbeddedKafkaSupport$aKafkaProducer$
                    private Vector<KafkaProducer<?, ?>> producers;
                    private final /* synthetic */ EmbeddedKafkaSupport $outer;

                    public <V> KafkaProducer<String, V> thatSerializesValuesWith(Class<? extends Serializer<V>> cls, EmbeddedKafkaConfig embeddedKafkaConfig) {
                        KafkaProducer<String, V> kafkaProducer = new KafkaProducer<>(JavaConversions$.MODULE$.mapAsJavaMap(this.$outer.net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig(embeddedKafkaConfig).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("key.serializer"), StringSerializer.class.getName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("value.serializer"), cls.getName()), Predef$.MODULE$.wrapRefArray(new Tuple2[0]))));
                        this.producers = (Vector) this.producers.$colon$plus(kafkaProducer, Vector$.MODULE$.canBuildFrom());
                        return kafkaProducer;
                    }

                    public <V> KafkaProducer<String, V> apply(Serializer<V> serializer, EmbeddedKafkaConfig embeddedKafkaConfig) {
                        KafkaProducer<String, V> kafkaProducer = new KafkaProducer<>(JavaConversions$.MODULE$.mapAsJavaMap(this.$outer.net$manub$embeddedkafka$EmbeddedKafkaSupport$$baseProducerConfig(embeddedKafkaConfig)), new StringSerializer(), serializer);
                        this.producers = (Vector) this.producers.$colon$plus(kafkaProducer, Vector$.MODULE$.canBuildFrom());
                        return kafkaProducer;
                    }

                    {
                        if (this == null) {
                            throw null;
                        }
                        this.$outer = this;
                        this.producers = package$.MODULE$.Vector().empty();
                        scala.sys.package$.MODULE$.addShutdownHook(() -> {
                            this.producers.foreach(kafkaProducer -> {
                                kafkaProducer.close();
                                return BoxedUnit.UNIT;
                            });
                        });
                    }
                };
            }
        }
    }

    public static final /* synthetic */ void $anonfun$stopKafka$1(KafkaServer kafkaServer) {
        kafkaServer.shutdown();
        kafkaServer.awaitShutdown();
    }

    private EmbeddedKafka$() {
        MODULE$ = this;
        EmbeddedKafkaSupport.$init$(this);
        this.factory = None$.MODULE$;
        this.broker = None$.MODULE$;
        this.logsDirs = Buffer$.MODULE$.empty();
    }
}
