/*
 * Decompiled with CFR 0.152.
 */
package fr.maif.reactor.eventsourcing;

import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.control.Option;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class InMemoryEventStore<E extends Event, Meta, Context>
implements EventStore<Tuple0, E, Meta, Context> {
    private List<EventEnvelope<E, Meta, Context>> eventStore = new ArrayList<EventEnvelope<E, Meta, Context>>();
    private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
    private final Flux<EventEnvelope<E, Meta, Context>> realTimeEvents;
    private AtomicLong sequence_num = new AtomicLong(0L);
    private final ConcurrentHashMap<String, Long> offsets = new ConcurrentHashMap();

    public InMemoryEventStore() {
        this.queue = Sinks.many().multicast().onBackpressureBuffer(10000);
        this.realTimeEvents = this.queue.asFlux();
    }

    public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create() {
        return new InMemoryEventStore<E, Meta, Context>();
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Tuple0 tx, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        return Flux.empty();
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Tuple0 tx, EventEnvelope<E, Meta, Context> eventEnvelope) {
        return this.markAsPublished(eventEnvelope);
    }

    public CompletionStage<Tuple0> openTransaction() {
        return CompletableFuture.completedStage(Tuple.empty());
    }

    public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Tuple0 tx) {
        return CompletionStages.empty();
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
        return CompletableFuture.completedStage(eventEnvelope.copy().withPublished(Boolean.valueOf(true)).build());
    }

    public CompletionStage<Long> nextSequence(Tuple0 tx) {
        return CompletableFuture.completedStage(this.sequence_num.incrementAndGet());
    }

    public CompletionStage<Tuple0> publish(io.vavr.collection.List<EventEnvelope<E, Meta, Context>> events) {
        events.forEach(arg_0 -> this.queue.tryEmitNext(arg_0));
        return CompletionStages.empty();
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEvents(String id) {
        return Flux.fromIterable(this.eventStore);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadAllEvents() {
        return Flux.fromIterable(this.eventStore);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tuple0 tx, EventStore.Query query) {
        return this.loadEventsByQuery(query);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(EventStore.Query query) {
        return Flux.fromIterable(this.eventStore).filter(e -> (Boolean)Option.of((Object)query.entityId).map(id -> id.equals(e.entityId)).getOrElse((Object)true));
    }

    public CompletionStage<Tuple0> persist(Tuple0 transactionContext, io.vavr.collection.List<EventEnvelope<E, Meta, Context>> events) {
        this.eventStore.addAll(events.toJavaList());
        return CompletionStages.empty();
    }

    public EventPublisher<E, Meta, Context> eventPublisher() {
        final InMemoryEventStore _this = this;
        return new EventPublisher<E, Meta, Context>(){

            public CompletionStage<Tuple0> publish(io.vavr.collection.List<EventEnvelope<E, Meta, Context>> events) {
                return _this.publish(events);
            }

            public void close() throws IOException {
            }
        };
    }
}

