package org.enode.pulsar.message;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Producer;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.common.utils.Assert;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.SendMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enode/pulsar/message/PulsarSendMessageService.class */
public class PulsarSendMessageService implements SendMessageService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarSendMessageService.class);
    private final Map<String, Producer<byte[]>> producerMap;

    public PulsarSendMessageService(List<Producer<byte[]>> list) {
        Assert.nonEmpty(list, "Pulsar producers");
        this.producerMap = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopic();
        }, producer -> {
            return producer;
        }));
    }

    public CompletableFuture<Boolean> sendMessageAsync(QueueMessage queueMessage) {
        Producer<byte[]> producer = this.producerMap.get(queueMessage.getTopic());
        if (producer != null) {
            return producer.newMessage().key(queueMessage.getRouteKey()).value(queueMessage.getBodyAndType().getBytes()).orderingKey(queueMessage.getKey().getBytes()).sendAsync().exceptionally(th -> {
                logger.error("Async send message has exception, message: {}", queueMessage, th);
                throw new IORuntimeException(th);
            }).thenApply(messageId -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Async send message success, sendResult: {}, message: {}", messageId, queueMessage);
                }
                return true;
            });
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new IORuntimeException(String.format("No producer for topic: [%s]", queueMessage.getTopic())));
        logger.error("No pulsar producer for topic [{}]", queueMessage.getTopic());
        return completableFuture;
    }
}
