package com.noleme.flow.connect.kafka.vault;

import com.noleme.flow.connect.kafka.config.KafkaConfig;
import com.noleme.vault.container.Invocation;
import com.noleme.vault.container.definition.ServiceInstantiation;
import com.noleme.vault.container.definition.ServiceValue;
import com.noleme.vault.container.register.Definitions;
import com.noleme.vault.parser.module.GenericModule;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

/* loaded from: input_file:com/noleme/flow/connect/kafka/vault/KafkaModule.class */
public class KafkaModule extends GenericModule<KafkaConfig> {
    private static final String CONFIG_ID = "kafka_config";
    private static final String TOPIC_CREATOR_ID = "kafka_topic_creator";
    private static final String DEFAULT_PRODUCER_ID = "kafka_default_producer";
    private static final String DEFAULT_CONSUMER_ID = "kafka_default_consumer";

    public KafkaModule() {
        super("kafka", KafkaConfig.class, KafkaModule::processConfig);
    }

    private static void processConfig(KafkaConfig kafkaConfig, Definitions definitions) {
        if (kafkaConfig.topics != null) {
            addTopicCreator(kafkaConfig, definitions);
        }
        if (kafkaConfig.producers != null && !kafkaConfig.producers.contains("bootstrap.servers")) {
            kafkaConfig.producers.setProperty("bootstrap.servers", kafkaConfig.bootstrapServers);
        }
        if (kafkaConfig.producers != null && kafkaConfig.provideDefaultProducer) {
            addDefaultProducer(kafkaConfig, definitions);
        }
        if (kafkaConfig.consumers != null && !kafkaConfig.consumers.contains("bootstrap.servers")) {
            kafkaConfig.consumers.setProperty("bootstrap.servers", kafkaConfig.bootstrapServers);
        }
        if (kafkaConfig.consumers != null && kafkaConfig.provideDefaultConsumer) {
            addDefaultConsumer(kafkaConfig, definitions);
        }
        definitions.services().set(CONFIG_ID, new ServiceValue(CONFIG_ID, kafkaConfig));
    }

    private static void addTopicCreator(KafkaConfig kafkaConfig, Definitions definitions) {
        ServiceInstantiation serviceInstantiation = new ServiceInstantiation(TOPIC_CREATOR_ID, KafkaTopicCreator.class.getName(), new Object[]{definitions.services().reference(CONFIG_ID)});
        if (kafkaConfig.createTopics) {
            serviceInstantiation.addInvocation(new Invocation("createTopics"));
        }
        definitions.services().set(serviceInstantiation.getIdentifier(), serviceInstantiation);
    }

    private static void addDefaultProducer(KafkaConfig kafkaConfig, Definitions definitions) {
        definitions.services().set(DEFAULT_PRODUCER_ID, new ServiceInstantiation(DEFAULT_PRODUCER_ID, KafkaProducer.class.getName(), new Object[]{kafkaConfig.producers}).setCloseable(true));
    }

    private static void addDefaultConsumer(KafkaConfig kafkaConfig, Definitions definitions) {
        definitions.services().set(DEFAULT_CONSUMER_ID, new ServiceInstantiation(DEFAULT_CONSUMER_ID, KafkaConsumer.class.getName(), new Object[]{kafkaConfig.consumers}).setCloseable(true));
    }
}
