package org.enodeframework.kafka;

import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.SendMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;

/* loaded from: input_file:org/enodeframework/kafka/KafkaSendMessageService.class */
public class KafkaSendMessageService implements SendMessageService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSendMessageService.class);
    private final KafkaTemplate<String, String> producer;

    public KafkaSendMessageService(KafkaTemplate<String, String> kafkaTemplate) {
        this.producer = kafkaTemplate;
    }

    public CompletableFuture<Boolean> sendMessageAsync(QueueMessage queueMessage) {
        return this.producer.send(covertToProducerRecord(queueMessage)).handle((sendResult, th) -> {
            if (th != null) {
                logger.error("Async send message has exception, message: {}", queueMessage, th);
                throw new IORuntimeException(th);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Async send message success, sendResult: {}, message: {}", sendResult, queueMessage);
            }
            return true;
        });
    }

    private ProducerRecord<String, String> covertToProducerRecord(QueueMessage queueMessage) {
        return new ProducerRecord<>(queueMessage.getTopic(), queueMessage.getRouteKey(), queueMessage.getBodyAndType());
    }
}
