package io.openmessaging.chaos.driver.kafka;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.openmessaging.chaos.driver.kafka.config.KafkaBrokerConfig;
import io.openmessaging.chaos.driver.kafka.config.KafkaClientConfig;
import io.openmessaging.chaos.driver.kafka.config.KafkaConfig;
import io.openmessaging.chaos.driver.mq.ConsumerCallback;
import io.openmessaging.chaos.driver.mq.MQChaosDriver;
import io.openmessaging.chaos.driver.mq.MQChaosNode;
import io.openmessaging.chaos.driver.mq.MQChaosProducer;
import io.openmessaging.chaos.driver.mq.MQChaosPullConsumer;
import io.openmessaging.chaos.driver.mq.MQChaosPushConsumer;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/chaos/driver/kafka/KafkaChaosDriver.class */
public class KafkaChaosDriver implements MQChaosDriver {
    private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private static final Logger log = LoggerFactory.getLogger(KafkaChaosDriver.class);
    private KafkaConfig kafkaConfig;
    private KafkaClientConfig kafkaClientConfig;
    private KafkaBrokerConfig kafkaBrokerConfig;
    private AdminClient admin;
    private Properties topicProperties;
    private Properties producerProperties;
    private Properties consumerProperties;

    private static KafkaClientConfig readConfigForClient(File file) throws IOException {
        return (KafkaClientConfig) MAPPER.readValue(file, KafkaClientConfig.class);
    }

    private static KafkaConfig readConfigForKafka(File file) throws IOException {
        return (KafkaConfig) MAPPER.readValue(file, KafkaConfig.class);
    }

    private static KafkaBrokerConfig readConfigForBroker(File file) throws IOException {
        return (KafkaBrokerConfig) MAPPER.readValue(file, KafkaBrokerConfig.class);
    }

    public void initialize(File file, List<String> list) throws IOException {
        this.kafkaConfig = readConfigForKafka(file);
        this.kafkaBrokerConfig = readConfigForBroker(file);
        this.kafkaClientConfig = readConfigForClient(file);
        Properties properties = new Properties();
        properties.load(new StringReader(this.kafkaClientConfig.commonConfig));
        this.topicProperties = new Properties();
        properties.forEach((obj, obj2) -> {
            this.topicProperties.put(obj, obj2);
        });
        this.topicProperties.load(new StringReader(this.kafkaClientConfig.topicConfig));
        this.producerProperties = new Properties();
        properties.forEach((obj3, obj4) -> {
            this.producerProperties.put(obj3, obj4);
        });
        this.producerProperties.load(new StringReader(this.kafkaClientConfig.producerConfig));
        this.producerProperties.put("key.serializer", StringSerializer.class.getName());
        this.producerProperties.put("value.serializer", ByteArraySerializer.class.getName());
        this.consumerProperties = new Properties();
        properties.forEach((obj5, obj6) -> {
            this.consumerProperties.put(obj5, obj6);
        });
        this.consumerProperties.load(new StringReader(this.kafkaClientConfig.consumerConfig));
        this.consumerProperties.put("key.deserializer", StringDeserializer.class.getName());
        this.consumerProperties.put("value.deserializer", ByteArrayDeserializer.class.getName());
    }

    public void createTopic(String str, int i) {
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new NewTopic(str, i, this.kafkaClientConfig.replicationFactor));
            this.admin = AdminClient.create(this.topicProperties);
            this.admin.createTopics(arrayList).all().get();
        } catch (Exception e) {
            log.error("");
            throw new RuntimeException(String.format("Failed to create topic [%s] to cluster", str), e);
        }
    }

    public MQChaosProducer createProducer(String str) {
        return new KafkaChaosProducer(new KafkaProducer(this.producerProperties), str);
    }

    public MQChaosPushConsumer createPushConsumer(String str, String str2, ConsumerCallback consumerCallback) {
        this.consumerProperties.put("group.id", str2);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerProperties);
        kafkaConsumer.subscribe(Arrays.asList(str));
        return new KafkaChaosPushConsumer(kafkaConsumer, consumerCallback);
    }

    public MQChaosPullConsumer createPullConsumer(String str, String str2) {
        throw new UnsupportedOperationException("Unsupport create a pull consumer currently");
    }

    public MQChaosNode createChaosNode(String str, List<String> list) {
        return new KafkaChaosNode(str, list, this.kafkaConfig, this.kafkaBrokerConfig);
    }

    public void shutdown() {
        if (this.admin != null) {
            this.admin.close();
        }
    }
}
