/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.tram.test;

import com.networknt.service.SingletonServiceFactory;
import com.networknt.tram.test.KafkaTestEnvironment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public abstract class KafkaTestBase {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
    protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
    protected static String brokerConnectionStrings;
    protected static Properties standardProps;
    protected static FiniteDuration timeout;
    protected static KafkaTestEnvironment kafkaServer;
    @ClassRule
    public static TemporaryFolder tempFolder;
    protected static Properties secureProps;

    @BeforeClass
    public static void prepare() throws ClassNotFoundException {
        KafkaTestBase.prepare(true);
    }

    public static void prepare(boolean hideKafkaBehindProxy) throws ClassNotFoundException {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        KafkaTestBase.startClusters(false, hideKafkaBehindProxy);
    }

    @AfterClass
    public static void shutDownServices() throws Exception {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Shut down KafkaTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        KafkaTestBase.shutdownClusters();
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    KafkaTestBase finished");
        LOG.info("-------------------------------------------------------------------------");
    }

    protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException {
        kafkaServer = (KafkaTestEnvironment)SingletonServiceFactory.getBean(KafkaTestEnvironment.class);
        LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
        kafkaServer.prepare(KafkaTestEnvironment.createConfig().setKafkaServersNumber(3).setSecureMode(secureMode));
        standardProps = kafkaServer.getStandardProperties();
        brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
        if (secureMode) {
            if (!kafkaServer.isSecureRunSupported()) {
                throw new IllegalStateException("Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
            }
            secureProps = kafkaServer.getSecureProperties();
        }
    }

    protected static void shutdownClusters() throws Exception {
        if (secureProps != null) {
            secureProps.clear();
        }
        kafkaServer.shutdown();
    }

    protected static void createTestTopic(String topic, int numberOfPartitions, short replicationFactor) {
        kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
    }

    protected static void deleteTestTopic(String topic) {
        kafkaServer.deleteTestTopic(topic);
    }

    protected void assertAtLeastOnceForTopic(Properties properties, String topic, int partition, Set<Integer> expectedElements, long timeoutMillis) {
        long startMillis = System.currentTimeMillis();
        HashSet<Integer> actualElements = new HashSet<Integer>();
        while (System.currentTimeMillis() < startMillis + timeoutMillis) {
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            Collection records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100L);
            for (ConsumerRecord record : records) {
                actualElements.add((Integer)record.value());
            }
            if (!actualElements.containsAll(expectedElements)) continue;
            return;
        }
        Assert.fail((String)String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
    }

    protected void assertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements) {
        this.assertExactlyOnceForTopic(properties, topic, partition, expectedElements, 30000L);
    }

    protected void assertExactlyOnceForTopic(Properties properties, String topic, int partition, List<Integer> expectedElements, long timeoutMillis) {
        long startMillis = System.currentTimeMillis();
        ArrayList<Integer> actualElements = new ArrayList<Integer>();
        Properties consumerProperties = new Properties();
        consumerProperties.putAll((Map<?, ?>)properties);
        consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        consumerProperties.put("isolation.level", "read_committed");
        while (System.currentTimeMillis() < startMillis + timeoutMillis) {
            Collection records = kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000L);
            for (ConsumerRecord record : records) {
                actualElements.add((Integer)record.value());
            }
            if (actualElements.equals(expectedElements)) {
                return;
            }
            if (actualElements.size() <= expectedElements.size()) continue;
            break;
        }
        Assert.fail((String)String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size()));
    }

    static {
        timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        tempFolder = new TemporaryFolder();
        secureProps = new Properties();
    }
}

