/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.kafka;

import java.io.File;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.server.Server;
import kafka.tools.StorageTool$;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import pl.touk.nussknacker.engine.kafka.EmbeddedKafkaServer;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;

public final class EmbeddedKafkaServer$ {
    public static final EmbeddedKafkaServer$ MODULE$ = new EmbeddedKafkaServer$();
    private static final boolean kRaftEnabled = false;
    private static final String localhost = "127.0.0.1";

    public boolean kRaftEnabled() {
        return kRaftEnabled;
    }

    public String localhost() {
        return localhost;
    }

    public EmbeddedKafkaServer run(int brokerPort, int controllerPort, Map<String, String> kafkaBrokerConfig) {
        EmbeddedKafkaServer embeddedKafkaServer;
        File tempDir = Files.createTempDirectory("embeddedKafka", new FileAttribute[0]).toFile();
        KafkaConfig kafkaConfig = this.prepareServerConfig(brokerPort, controllerPort, tempDir, kafkaBrokerConfig);
        if (this.kRaftEnabled()) {
            this.prepareRaftStorage(tempDir, kafkaConfig);
            embeddedKafkaServer = new EmbeddedKafkaServer((Option<Tuple3<NIOServerCnxnFactory, ZooKeeperServer, File>>)None$.MODULE$, (Server)new KafkaRaftServer(kafkaConfig, Time.SYSTEM), this.localhost() + ":" + brokerPort, tempDir);
        } else {
            Tuple3<NIOServerCnxnFactory, ZooKeeperServer, File> zk = this.createZookeeperServer(controllerPort);
            embeddedKafkaServer = new EmbeddedKafkaServer((Option<Tuple3<NIOServerCnxnFactory, ZooKeeperServer, File>>)new Some(zk), (Server)new KafkaServer(kafkaConfig, Time.SYSTEM, KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4()), this.localhost() + ":" + brokerPort, tempDir);
        }
        EmbeddedKafkaServer server = embeddedKafkaServer;
        server.startup();
        return server;
    }

    private KafkaConfig prepareServerConfig(int brokerPort, int controllerPort, File logDir, Map<String, String> kafkaBrokerConfig) {
        Object object;
        Properties properties = new Properties();
        properties.setProperty("broker.id", "0");
        if (this.kRaftEnabled()) {
            properties.setProperty("process.roles", "broker,controller");
            properties.setProperty("listeners", "PLAINTEXT://" + this.localhost() + ":" + brokerPort + ",CONTROLLER://" + this.localhost() + ":" + controllerPort);
            properties.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
            properties.setProperty("controller.listener.names", "CONTROLLER");
            object = properties.setProperty("controller.quorum.voters", "0@" + this.localhost() + ":" + controllerPort);
        } else {
            properties.setProperty("zookeeper.connect", this.localhost() + ":" + controllerPort);
            object = properties.setProperty("listeners", "PLAINTEXT://" + this.localhost() + ":" + brokerPort);
        }
        properties.setProperty("num.partitions", "1");
        properties.setProperty("offsets.topic.num.partitions", "1");
        properties.setProperty("offsets.topic.replication.factor", "1");
        properties.setProperty("log.cleaner.dedupe.buffer.size", Long.toString(0x200000L));
        properties.setProperty("transaction.state.log.num.partitions", "1");
        properties.setProperty("transaction.state.log.replication.factor", "1");
        properties.setProperty("transaction.state.log.min.isr", "1");
        properties.setProperty("log.dir", logDir.getAbsolutePath());
        kafkaBrokerConfig.foreach((Function1 & Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String key = (String)tuple2._1();
                String value = (String)tuple2._2();
                return properties.setProperty(key, value);
            }
            throw new MatchError((Object)tuple2);
        });
        return new KafkaConfig((java.util.Map)properties);
    }

    private int prepareRaftStorage(File logDir, KafkaConfig kafkaConfig) {
        Uuid uuid = Uuid.randomUuid();
        return StorageTool$.MODULE$.formatCommand(new PrintStream((OutputStream)NullOutputStream.NULL_OUTPUT_STREAM), (Seq)new .colon.colon((Object)logDir.getAbsolutePath(), (List)Nil$.MODULE$), StorageTool$.MODULE$.buildMetadataProperties(uuid.toString(), kafkaConfig), MetadataVersion.IBP_3_3_IV3, false);
    }

    private Tuple3<NIOServerCnxnFactory, ZooKeeperServer, File> createZookeeperServer(int zkPort) {
        NIOServerCnxnFactory factory = new NIOServerCnxnFactory();
        factory.configure(new InetSocketAddress(this.localhost(), zkPort), 1024);
        File tempDir = Files.createTempDirectory("embeddedKafkaZk", new FileAttribute[0]).toFile();
        ZooKeeperServer zkServer = new ZooKeeperServer(tempDir, tempDir, 3000);
        return new Tuple3((Object)factory, (Object)zkServer, (Object)tempDir);
    }

    private EmbeddedKafkaServer$() {
    }
}

