package com.github.ideahut.qms.shared.kafka;

import com.github.ideahut.qms.shared.queue.QueueHeader;
import com.github.ideahut.qms.shared.queue.QueueMessage;
import com.github.ideahut.qms.shared.queue.QueueSender;
import com.github.ideahut.qms.shared.task.TaskHandler;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import io.vertx.kafka.admin.NewTopic;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ideahut/qms/shared/kafka/KafkaSender.class */
class KafkaSender<V> implements QueueSender<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSender.class);
    private final KafkaAdminClient admin;
    private final TaskHandler taskHandler;
    private final KafkaProducer<String, V> producer;
    private final List<NewTopic> topics;

    public KafkaSender(Class<V> cls, KafkaAdminClient kafkaAdminClient, Vertx vertx, Map<String, String> map, TaskHandler taskHandler, KafkaSenderProperties kafkaSenderProperties) {
        String defaultSerializer = KafkaHelper.getDefaultSerializer(cls);
        if (defaultSerializer == null) {
            throw new RuntimeException("Unsupported serializer type: " + cls + ", topic: " + kafkaSenderProperties.getTopicProperties().getName());
        }
        HashMap hashMap = new HashMap(map);
        if (kafkaSenderProperties.getConfig() != null) {
            for (String str : kafkaSenderProperties.getConfig().keySet()) {
                String orDefault = kafkaSenderProperties.getConfig().getOrDefault(str, null);
                if (orDefault != null) {
                    hashMap.put(str, orDefault);
                }
            }
        }
        hashMap.put("key.serializer", KafkaHelper.getDefaultSerializer(String.class));
        hashMap.put("value.serializer", defaultSerializer);
        this.topics = kafkaSenderProperties.getTopicProperties().createNewTopics();
        this.admin = kafkaAdminClient;
        this.taskHandler = taskHandler;
        this.producer = KafkaProducer.create(vertx, hashMap);
    }

    @Override // com.github.ideahut.qms.shared.queue.QueueSender
    public void sendMessage(QueueMessage<V> queueMessage) {
        KafkaHelper.setupTopic(LOGGER, this.admin, this.topics);
        KafkaProducerRecord create = KafkaProducerRecord.create(this.topics.get(0).getName(), queueMessage.getBody());
        QueueHeader header = queueMessage.getHeader();
        if (header != null) {
            for (String str : header.keySet()) {
                if (((String) header.get(str)) != null) {
                    create.addHeader(str, (String) header.get(str));
                }
            }
        }
        this.producer.send(create, asyncResult -> {
            if (asyncResult.succeeded()) {
                RecordMetadata recordMetadata = (RecordMetadata) asyncResult.result();
                LOGGER.debug("### Message " + create.value() + " written on topic=" + recordMetadata.getTopic() + ", partition=" + recordMetadata.getPartition() + ", offset=" + recordMetadata.getOffset());
            }
        });
    }

    @Override // com.github.ideahut.qms.shared.queue.QueueSender
    public void sendMessage(final QueueMessage<V> queueMessage, boolean z) {
        if (!z || this.taskHandler == null) {
            sendMessage(queueMessage);
        } else {
            this.taskHandler.execute(new Runnable() { // from class: com.github.ideahut.qms.shared.kafka.KafkaSender.1
                @Override // java.lang.Runnable
                public void run() {
                    KafkaSender.this.sendMessage(queueMessage);
                }
            });
        }
    }
}
