package io.memoria.jutils.nats;

import io.memoria.jutils.core.eventsourcing.event.Event;
import io.memoria.jutils.core.eventsourcing.event.EventStore;
import io.memoria.jutils.core.transformer.StringTransformer;
import io.nats.client.Connection;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.StreamingConnectionFactory;
import io.nats.streaming.SubscriptionOptions;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/nats/NatsEventStore.class */
public class NatsEventStore implements EventStore {
    private static final Logger log = LoggerFactory.getLogger(NatsEventStore.class.getName());
    private final StreamingConnection sc;
    private final Duration timeout;
    private final Scheduler scheduler;
    private final StringTransformer transformer;

    public NatsEventStore(Connection connection, Duration duration, Scheduler scheduler, StringTransformer stringTransformer) throws IOException, InterruptedException {
        this.sc = new StreamingConnectionFactory(new Options.Builder().natsConn(connection).clientId("jutils").clusterId("test-cluster").build()).createConnection();
        this.timeout = duration;
        this.scheduler = scheduler;
        this.transformer = stringTransformer;
    }

    public Flux<Event> add(String str, Flux<Event> flux) {
        return flux.concatMap(event -> {
            return publish(str, event);
        }).subscribeOn(this.scheduler).timeout(this.timeout);
    }

    public Mono<Boolean> exists(String str) {
        return stream(str).take(1L).timeout(this.timeout).count().map(l -> {
            return true;
        }).subscribeOn(this.scheduler).onErrorReturn(false);
    }

    public Flux<Event> stream(String str) {
        Flux create = Flux.create(fluxSink -> {
            try {
                this.sc.subscribe(str, message -> {
                    fluxSink.next((Event) this.transformer.deserialize(new String(message.getData()), Event.class).get());
                }, new SubscriptionOptions.Builder().deliverAllAvailable().build());
            } catch (IOException | InterruptedException | TimeoutException e) {
                e.printStackTrace();
                fluxSink.error(e);
            }
            log.info("subscribing to: " + str);
            fluxSink.onDispose(() -> {
                log.info("Dispose signal, Unsubscribing now from subject: " + str);
            });
            fluxSink.onCancel(() -> {
                log.info("Cancellation signal to subject:" + str);
            });
        });
        return Flux.defer(() -> {
            return create.subscribeOn(this.scheduler);
        });
    }

    private Mono<Event> publish(String str, Event event) {
        return Mono.fromCallable(() -> {
            this.sc.publish(str, ((String) this.transformer.serialize(event).get()).getBytes(StandardCharsets.UTF_8));
            return event;
        }).subscribeOn(this.scheduler);
    }
}
