package fr.maif.reactor.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.control.Option;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:fr/maif/reactor/eventsourcing/InMemoryEventStore.class */
public class InMemoryEventStore<E extends Event, Meta, Context> implements EventStore<Tuple0, E, Meta, Context> {
    private List<EventEnvelope<E, Meta, Context>> eventStore = new ArrayList();
    private AtomicLong sequence_num = new AtomicLong(0);
    private final ConcurrentHashMap<String, Long> offsets = new ConcurrentHashMap<>();
    private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue = Sinks.many().multicast().onBackpressureBuffer(10000);
    private final Flux<EventEnvelope<E, Meta, Context>> realTimeEvents = this.queue.asFlux();

    public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create() {
        return new InMemoryEventStore<>();
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Tuple0 tuple0, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        return Flux.empty();
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Tuple0 tuple0, EventEnvelope<E, Meta, Context> eventEnvelope) {
        return markAsPublished(eventEnvelope);
    }

    public CompletionStage<Tuple0> openTransaction() {
        return CompletableFuture.completedStage(Tuple.empty());
    }

    public CompletionStage<Void> commitOrRollback(Option<Throwable> option, Tuple0 tuple0) {
        return CompletionStages.empty();
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
        return CompletableFuture.completedStage(eventEnvelope.copy().withPublished(true).build());
    }

    public CompletionStage<Long> nextSequence(Tuple0 tuple0) {
        return CompletableFuture.completedStage(Long.valueOf(this.sequence_num.incrementAndGet()));
    }

    public CompletionStage<Void> publish(io.vavr.collection.List<EventEnvelope<E, Meta, Context>> list) {
        Sinks.Many<EventEnvelope<E, Meta, Context>> many = this.queue;
        Objects.requireNonNull(many);
        list.forEach((v1) -> {
            r1.tryEmitNext(v1);
        });
        return CompletionStages.empty();
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEvents(String str) {
        return Flux.fromIterable(this.eventStore);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadAllEvents() {
        return Flux.fromIterable(this.eventStore);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tuple0 tuple0, EventStore.Query query) {
        return loadEventsByQuery(query);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(EventStore.Query query) {
        return Flux.fromIterable(this.eventStore).filter(eventEnvelope -> {
            return ((Boolean) Option.of(query.entityId).map(str -> {
                return Boolean.valueOf(str.equals(eventEnvelope.entityId));
            }).getOrElse(true)).booleanValue();
        });
    }

    public CompletionStage<Void> persist(Tuple0 tuple0, io.vavr.collection.List<EventEnvelope<E, Meta, Context>> list) {
        this.eventStore.addAll(list.toJavaList());
        return CompletionStages.empty();
    }

    public /* bridge */ /* synthetic */ CompletionStage commitOrRollback(Option option, Object obj) {
        return commitOrRollback((Option<Throwable>) option, (Tuple0) obj);
    }
}
