package io.memoria.jutils.pulsar;

import io.memoria.jutils.core.eventsourcing.Event;
import io.memoria.jutils.core.eventsourcing.EventStream;
import io.memoria.jutils.core.transformer.StringTransformer;
import java.util.Objects;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/memoria/jutils/pulsar/PulsarEventStream.class */
public class PulsarEventStream implements EventStream {
    private final PulsarClient client;
    private final PulsarAdmin admin;
    private final StringTransformer transformer;

    public PulsarEventStream(String str, String str2, StringTransformer stringTransformer) throws PulsarClientException {
        this.client = PulsarClient.builder().serviceUrl(str).build();
        this.admin = PulsarAdmin.builder().serviceHttpUrl(str2).build();
        this.transformer = stringTransformer;
    }

    public <E extends Event> Flux<E> add(String str, Flux<E> flux) {
        return Mono.fromCallable(() -> {
            return createProducer(str);
        }).flatMapMany(producer -> {
            return flux.concatMap(event -> {
                return send(producer, event);
            });
        });
    }

    public Mono<Boolean> exists(String str) {
        return Mono.fromFuture(this.admin.topics().getStatsAsync(str)).map(topicStats -> {
            return true;
        }).onErrorReturn(PulsarAdminException.NotFoundException.class, false);
    }

    public <E extends Event> Flux<E> stream(String str, Class<E> cls) {
        return Mono.fromCallable(() -> {
            return createConsumer(str);
        }).flatMapMany(consumer -> {
            return receive(consumer, cls);
        });
    }

    private Consumer<String> createConsumer(String str) throws PulsarClientException {
        Consumer<String> subscribe = this.client.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str + "_subscription").subscribe();
        subscribe.seek(0L);
        return subscribe;
    }

    private Producer<String> createProducer(String str) throws PulsarClientException {
        return this.client.newProducer(Schema.STRING).topic(str).create();
    }

    private <E extends Event> Flux<E> receive(Consumer<String> consumer, Class<E> cls) {
        Objects.requireNonNull(consumer);
        return Mono.fromFuture(consumer::receiveAsync).map((v0) -> {
            return v0.getValue();
        }).map(str -> {
            return (Event) this.transformer.deserialize(str, cls).get();
        }).repeat();
    }

    private <E extends Event> Mono<E> send(Producer<String> producer, E e) {
        return Mono.fromCallable(() -> {
            return (String) this.transformer.serialize(e).get();
        }).flatMap(str -> {
            return Mono.fromFuture(() -> {
                return producer.sendAsync(str);
            });
        }).map(messageId -> {
            return e;
        });
    }
}
