/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.tram.test;

import com.google.common.collect.Maps;
import com.networknt.tram.test.KafkaTestEnvironment;
import com.networknt.utility.NetUtils;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import kafka.common.KafkaException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

public class KafkaTestEnvironmentImpl
implements KafkaTestEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    private File tmpZkDir;
    private File tmpKafkaParent;
    private List<File> tmpKafkaDirs;
    private List<KafkaServer> brokers;
    private TestingServer zookeeper;
    private String zookeeperConnectionString;
    private String brokerConnectionString = "";
    private Properties standardProps;
    private int zkTimeout = 30000;
    private KafkaTestEnvironment.Config config;

    @Override
    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    @Override
    public Properties getStandardProperties() {
        return this.standardProps;
    }

    @Override
    public Properties getSecureProperties() {
        Properties prop = new Properties();
        if (this.config.isSecureMode()) {
            prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
            prop.put("security.protocol", "SASL_PLAINTEXT");
            prop.put("sasl.kerberos.service.name", "kafka");
            prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
            prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
            prop.setProperty("metadata.fetch.timeout.ms", "120000");
        }
        return prop;
    }

    public Map<String, Object> getDefaultClientConfig() {
        HashMap defaultClientConfig = Maps.newHashMap();
        defaultClientConfig.put("bootstrap.servers", this.getBrokerConnectionString());
        defaultClientConfig.put("client.id", "test-consumer-id");
        defaultClientConfig.put("enable.auto.commit", "true");
        defaultClientConfig.put("metadata.max.age.ms", "3000");
        defaultClientConfig.put("auto.commit.interval.ms", "1000");
        defaultClientConfig.put("session.timeout.ms", "30000");
        return defaultClientConfig;
    }

    @Override
    public String getVersion() {
        return "2.1.0";
    }

    @Override
    public List<KafkaServer> getBrokers() {
        return this.brokers;
    }

    @Override
    public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
        ArrayList<ConsumerRecord> result = new ArrayList<ConsumerRecord>();
        try (KafkaConsumer consumer = new KafkaConsumer(properties);){
            boolean processedAtLeastOneRecord;
            consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
            do {
                processedAtLeastOneRecord = false;
                for (ConsumerRecord record : consumer.poll(timeout)) {
                    result.add(record);
                    processedAtLeastOneRecord = true;
                }
            } while (processedAtLeastOneRecord);
            consumer.commitSync();
        }
        return UnmodifiableList.decorate(result);
    }

    @Override
    public KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler() {
        return new KafkaOffsetHandlerImpl();
    }

    @Override
    public void restartBroker(int leaderId) throws Exception {
        this.brokers.set(leaderId, this.getKafkaServer(leaderId, this.tmpKafkaDirs.get(leaderId)));
    }

    @Override
    public int getLeaderToShutDown(String topic) throws Exception {
        try (AdminClient adminClient = KafkaAdminClient.create(this.getDefaultClientConfig());){
            DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton(topic));
            TopicDescription description = (TopicDescription)((KafkaFuture)result.values().get(topic)).get();
            List partitions = description.partitions();
            int n = ((TopicPartitionInfo)partitions.get(0)).leader().id();
            return n;
        }
    }

    @Override
    public int getBrokerId(KafkaServer server) {
        return server.config().brokerId();
    }

    @Override
    public boolean isSecureRunSupported() {
        return true;
    }

    @Override
    public void prepare(KafkaTestEnvironment.Config config) {
        if (config.isSecureMode()) {
            config.setKafkaServersNumber(1);
            this.zkTimeout *= 15;
        }
        this.config = config;
        File tempDir = new File(System.getProperty("java.io.tmpdir"));
        this.tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + UUID.randomUUID().toString());
        Assert.assertTrue((String)"cannot create zookeeper temp dir", (boolean)this.tmpZkDir.mkdirs());
        this.tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + UUID.randomUUID().toString());
        Assert.assertTrue((String)"cannot create kafka temp dir", (boolean)this.tmpKafkaParent.mkdirs());
        this.tmpKafkaDirs = new ArrayList<File>(config.getKafkaServersNumber());
        for (int i = 0; i < config.getKafkaServersNumber(); ++i) {
            File tmpDir = new File(this.tmpKafkaParent, "server-" + i);
            Assert.assertTrue((String)"cannot create kafka temp dir", (boolean)tmpDir.mkdir());
            this.tmpKafkaDirs.add(tmpDir);
        }
        this.zookeeper = null;
        this.brokers = null;
        try {
            this.zookeeper = new TestingServer(-1, this.tmpZkDir);
            this.zookeeperConnectionString = this.zookeeper.getConnectString();
            LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", (Object)this.zookeeperConnectionString);
            LOG.info("Starting KafkaServer");
            this.brokers = new ArrayList<KafkaServer>(config.getKafkaServersNumber());
            ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT));
            for (int i = 0; i < config.getKafkaServersNumber(); ++i) {
                KafkaServer kafkaServer = this.getKafkaServer(i, this.tmpKafkaDirs.get(i));
                this.brokers.add(kafkaServer);
                this.brokerConnectionString = this.brokerConnectionString + NetUtils.hostAndPortToUrlString((String)"localhost", (int)kafkaServer.socketServer().boundPort(listenerName));
                this.brokerConnectionString = this.brokerConnectionString + ",";
            }
            LOG.info("ZK and KafkaServer started.");
        }
        catch (Throwable t) {
            t.printStackTrace();
            Assert.fail((String)("Test setup failed: " + t.getMessage()));
        }
        this.standardProps = new Properties();
        this.standardProps.setProperty("zookeeper.connect", this.zookeeperConnectionString);
        this.standardProps.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.standardProps.setProperty("group.id", "tram-tests");
        this.standardProps.setProperty("enable.auto.commit", "false");
        this.standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("auto.offset.reset", "earliest");
        this.standardProps.setProperty("max.partition.fetch.bytes", "256");
    }

    @Override
    public void shutdown() {
        for (KafkaServer broker : this.brokers) {
            if (broker == null) continue;
            broker.shutdown();
        }
        this.brokers.clear();
        if (this.zookeeper != null) {
            try {
                this.zookeeper.stop();
            }
            catch (Exception e) {
                LOG.warn("ZK.stop() failed", (Throwable)e);
            }
            this.zookeeper = null;
        }
        if (this.tmpKafkaParent != null && this.tmpKafkaParent.exists()) {
            try {
                FileUtils.deleteDirectory((File)this.tmpKafkaParent);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (this.tmpZkDir != null && this.tmpZkDir.exists()) {
            try {
                FileUtils.deleteDirectory((File)this.tmpZkDir);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    @Override
    public void createTestTopic(String topic, int numberOfPartitions, short replicationFactor) {
        this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties());
    }

    @Override
    public void createTestTopic(String topic, int numberOfPartitions, short replicationFactor, Properties topicConfig) {
        LOG.info("Creating topic {}", (Object)topic);
        try (AdminClient adminClient = KafkaAdminClient.create(this.getDefaultClientConfig());){
            try {
                NewTopic newTopic = new NewTopic(topic, numberOfPartitions, replicationFactor);
                CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
                ((KafkaFuture)createTopicsResult.values().get(topic)).get();
            }
            catch (InterruptedException | ExecutionException e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void deleteTestTopic(String topic) {
        try (AdminClient adminClient = KafkaAdminClient.create(this.getDefaultClientConfig());){
            try {
                DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topic));
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof TopicExistsException)) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
    }

    protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("advertised.host.name", "localhost");
        kafkaProperties.put("broker.id", Integer.toString(brokerId));
        kafkaProperties.put("log.dir", tmpFolder.toString());
        kafkaProperties.put("zookeeper.connect", this.zookeeperConnectionString);
        kafkaProperties.put("message.max.bytes", String.valueOf(0x3200000));
        kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(0x3200000));
        kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(0x6DDD00));
        kafkaProperties.put("zookeeper.session.timeout.ms", (Object)this.zkTimeout);
        kafkaProperties.put("zookeeper.connection.timeout.ms", (Object)this.zkTimeout);
        if (this.config.getKafkaServerProperties() != null) {
            kafkaProperties.putAll((Map<?, ?>)this.config.getKafkaServerProperties());
        }
        int numTries = 5;
        for (int i = 1; i <= 5; ++i) {
            int kafkaPort = NetUtils.getAvailablePort();
            kafkaProperties.put("port", Integer.toString(kafkaPort));
            if (this.config.isSecureMode()) {
                LOG.info("Adding Kafka secure configurations");
                kafkaProperties.put("listeners", "SASL_PLAINTEXT://localhost:" + kafkaPort);
                kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://localhost:" + kafkaPort);
                kafkaProperties.putAll((Map<?, ?>)this.getSecureProperties());
            }
            KafkaConfig kafkaConfig = new KafkaConfig((Map)kafkaProperties);
            try {
                Option stringNone = Option.apply(null);
                KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, (Seq)new ArraySeq(0));
                server.startup();
                return server;
            }
            catch (KafkaException e) {
                if (!(e.getCause() instanceof BindException)) {
                    throw e;
                }
                LOG.info("Port conflict when starting Kafka Broker. Retrying...");
                continue;
            }
        }
        throw new Exception("Could not start Kafka after 5 retries due to port conflicts.");
    }

    private class KafkaOffsetHandlerImpl
    implements KafkaTestEnvironment.KafkaOffsetHandler {
        private final KafkaConsumer<byte[], byte[]> offsetClient;

        public KafkaOffsetHandlerImpl() {
            Properties props = new Properties();
            props.putAll((Map<?, ?>)KafkaTestEnvironmentImpl.this.standardProps);
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.offsetClient = new KafkaConsumer(props);
        }

        @Override
        public Long getCommittedOffset(String topicName, int partition) {
            OffsetAndMetadata committed = this.offsetClient.committed(new TopicPartition(topicName, partition));
            return committed != null ? Long.valueOf(committed.offset()) : null;
        }

        @Override
        public void setCommittedOffset(String topicName, int partition, long offset) {
            HashMap<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
            partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
            this.offsetClient.commitSync(partitionAndOffset);
        }

        @Override
        public void close() {
            this.offsetClient.close();
        }
    }
}

