package org.enode.pulsar.message;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Producer;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.SendMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enode/pulsar/message/PulsarSendMessageService.class */
public class PulsarSendMessageService implements SendMessageService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarSendMessageService.class);
    private final Map<String, Producer<byte[]>> producers;

    public PulsarSendMessageService(List<Producer<byte[]>> list) {
        this.producers = (Map) ((List) Optional.ofNullable(list).orElse(new ArrayList())).stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopic();
        }, producer -> {
            return producer;
        }));
        if (this.producers.isEmpty()) {
            throw new IllegalArgumentException("producers can not empty.");
        }
    }

    public CompletableFuture<Boolean> sendMessageAsync(QueueMessage queueMessage) {
        Producer<byte[]> producer = this.producers.get(queueMessage.getTopic());
        if (producer != null) {
            return producer.newMessage().key(queueMessage.getRouteKey()).value(queueMessage.getBody().getBytes()).orderingKey(queueMessage.getKey().getBytes()).sendAsync().exceptionally(th -> {
                logger.error("Enode message async send has exception, message: {}, routingKey: {}", new Object[]{queueMessage.getBody(), queueMessage.getRouteKey(), th});
                throw new IORuntimeException(th);
            }).thenApply(messageId -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Enode message async send success, sendResult: {}, message: {}", messageId, queueMessage.getBody());
                }
                return true;
            });
        }
        logger.error("can not find pulsar producer for topic [{}]", queueMessage.getTopic());
        return CompletableFuture.completedFuture(false);
    }
}
