package io.memoria.jutils.kafka.eventsourcing.event;

import io.memoria.jutils.core.eventsourcing.event.Event;
import io.memoria.jutils.core.eventsourcing.event.EventStore;
import io.memoria.jutils.core.transformer.StringTransformer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:io/memoria/jutils/kafka/eventsourcing/event/KafkaEventStore.class */
public class KafkaEventStore implements EventStore {
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer;
    private final Duration timeout;
    private final Scheduler scheduler;
    private final StringTransformer transformer;
    private final AdminClient adminClient;

    public KafkaEventStore(Map<String, Object> map, Map<String, Object> map2, Duration duration, Scheduler scheduler, StringTransformer stringTransformer) {
        this.consumer = new KafkaConsumer<>(map2);
        this.producer = new KafkaProducer<>(map);
        this.timeout = duration;
        this.scheduler = scheduler;
        this.transformer = stringTransformer;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", map.get("bootstrap.servers").toString());
        this.adminClient = AdminClient.create(properties);
    }

    public Flux<Event> add(String str, Flux<Event> flux) {
        return flux.concatMap(event -> {
            return sendRecord(str, event);
        });
    }

    public Mono<Boolean> exists(String str) {
        return Mono.fromCallable(() -> {
            return Boolean.valueOf(((Set) this.adminClient.listTopics().names().get()).contains(str));
        });
    }

    public Flux<Event> stream(String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        return Mono.fromRunnable(() -> {
            this.consumer.assign(List.of(topicPartition));
            this.consumer.poll(this.timeout);
            this.consumer.seek(topicPartition, 0L);
        }).thenMany(Flux.generate(synchronousSink -> {
            synchronousSink.next(pollOnce(topicPartition));
        }).concatMap(flux -> {
            return flux;
        }).subscribeOn(this.scheduler));
    }

    private Flux<Event> pollOnce(TopicPartition topicPartition) {
        return Mono.fromCallable(() -> {
            return this.consumer.poll(this.timeout);
        }).flux().concatMap(consumerRecords -> {
            return Flux.fromIterable(consumerRecords.records(topicPartition));
        }).map((v0) -> {
            return v0.value();
        }).map(str -> {
            return (Event) this.transformer.deserialize(str, Event.class).get();
        });
    }

    private Mono<Event> sendRecord(String str, Event event) {
        ProducerRecord producerRecord = new ProducerRecord(str, 0, event.id().value(), (String) this.transformer.serialize(event).get());
        return Mono.create(monoSink -> {
            this.producer.send(producerRecord, (recordMetadata, exc) -> {
                if (recordMetadata != null) {
                    monoSink.success(recordMetadata);
                } else {
                    monoSink.error(exc);
                }
            });
        }).subscribeOn(this.scheduler).map(recordMetadata -> {
            return event;
        });
    }
}
