package io.memoria.jutils.messaging.adapter.pulsar;

import io.memoria.jutils.core.utils.yaml.YamlConfigMap;
import io.memoria.jutils.messaging.domain.Message;
import io.memoria.jutils.messaging.domain.MessageFilter;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/memoria/jutils/messaging/adapter/pulsar/PulsarUtils.class */
public class PulsarUtils {
    public static Mono<Message> consume(Consumer<String> consumer) {
        return Mono.fromFuture(consumer.receiveAsync()).map(message -> {
            return new Message((String) message.getValue()).withId(message.getKey());
        });
    }

    public static Consumer<String> createConsumer(PulsarClient pulsarClient, MessageFilter messageFilter) throws PulsarClientException {
        Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{messageFilter.topic()}).subscriptionName(messageFilter.topic() + "subscription").subscribe();
        subscribe.seek(messageFilter.offset());
        return subscribe;
    }

    public static Producer<String> createProducer(PulsarClient pulsarClient, MessageFilter messageFilter) throws PulsarClientException {
        return pulsarClient.newProducer(Schema.STRING).topic(messageFilter.topic()).create();
    }

    public static PulsarClient pulsarClient(YamlConfigMap yamlConfigMap) throws PulsarClientException {
        return PulsarClient.builder().serviceUrl((String) ((YamlConfigMap) yamlConfigMap.asYamlConfigMap("pulsar").get()).asString("serviceUrl").get()).build();
    }

    private PulsarUtils() {
    }
}
