package fr.maif.akka.eventsourcing;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.BroadcastHub;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.control.Option;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;

/* loaded from: input_file:fr/maif/akka/eventsourcing/InMemoryEventStore.class */
public class InMemoryEventStore<E extends Event, Meta, Context> implements EventStore<Tuple0, E, Meta, Context> {
    private final ActorSystem system;
    private final Materializer materializer;
    private final SourceQueueWithComplete<EventEnvelope> queue;
    private final Source<EventEnvelope, NotUsed> realTimeEvents;
    private List<EventEnvelope<E, Meta, Context>> eventStore = new ArrayList();
    private AtomicLong sequence_num = new AtomicLong(0);
    private final ConcurrentHashMap<String, Long> offsets = new ConcurrentHashMap<>();

    public InMemoryEventStore(ActorSystem actorSystem) {
        this.system = actorSystem;
        this.materializer = Materializer.createMaterializer(actorSystem);
        Pair pair = (Pair) Source.queue(500, OverflowStrategy.backpressure()).toMat(BroadcastHub.of(EventEnvelope.class, 256), Keep.both()).run(this.materializer);
        this.queue = (SourceQueueWithComplete) pair.first();
        this.realTimeEvents = (Source) pair.second();
        this.realTimeEvents.runWith(Sink.ignore(), this.materializer);
    }

    public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create(ActorSystem actorSystem) {
        return new InMemoryEventStore<>(actorSystem);
    }

    public CompletionStage<Long> lastPublishedSequence() {
        return CompletionStages.completedStage((Long) this.eventStore.stream().filter(eventEnvelope -> {
            return eventEnvelope.published.booleanValue();
        }).map(eventEnvelope2 -> {
            return eventEnvelope2.sequenceNum;
        }).max(Comparator.comparingLong(l -> {
            return l.longValue();
        })).orElse(0L));
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Tuple0 tuple0, EventStore.ConcurrentReplayStrategy concurrentReplayStrategy) {
        return (Publisher) Source.empty().runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.system);
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Tuple0 tuple0, EventEnvelope<E, Meta, Context> eventEnvelope) {
        return markAsPublished(eventEnvelope);
    }

    public CompletionStage<Tuple0> openTransaction() {
        return CompletionStages.empty();
    }

    public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> option, Tuple0 tuple0) {
        return CompletableFuture.supplyAsync(Tuple::empty);
    }

    public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
        return CompletionStages.completedStage(eventEnvelope.copy().withPublished(true).build());
    }

    public CompletionStage<Long> nextSequence(Tuple0 tuple0) {
        return CompletionStages.completedStage(Long.valueOf(this.sequence_num.incrementAndGet()));
    }

    public CompletionStage<io.vavr.collection.List<Long>> nextSequences(Tuple0 tuple0, Integer num) {
        return CompletionStages.completedStage(io.vavr.collection.List.range(0, num.intValue()).map(num2 -> {
            return Long.valueOf(this.sequence_num.incrementAndGet());
        }));
    }

    public CompletionStage<Tuple0> publish(io.vavr.collection.List<EventEnvelope<E, Meta, Context>> list) {
        SourceQueueWithComplete<EventEnvelope> sourceQueueWithComplete = this.queue;
        Objects.requireNonNull(sourceQueueWithComplete);
        list.forEach((v1) -> {
            r1.offer(v1);
        });
        return CompletableFuture.supplyAsync(Tuple::empty);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEvents(String str) {
        return (Publisher) Source.from(this.eventStore).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.system);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadAllEvents() {
        return (Publisher) Source.from(this.eventStore).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.system);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Tuple0 tuple0, EventStore.Query query) {
        return loadEventsByQuery(query);
    }

    public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(EventStore.Query query) {
        return (Publisher) Source.from(this.eventStore).filter(eventEnvelope -> {
            return ((Boolean) Option.of(query.entityId).map(str -> {
                return Boolean.valueOf(str.equals(eventEnvelope.entityId));
            }).getOrElse(true)).booleanValue();
        }).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.system);
    }

    public CompletionStage<Tuple0> persist(Tuple0 tuple0, io.vavr.collection.List<EventEnvelope<E, Meta, Context>> list) {
        this.eventStore.addAll(list.toJavaList());
        return CompletableFuture.supplyAsync(Tuple::empty);
    }

    public EventPublisher<E, Meta, Context> eventPublisher() {
        return (EventPublisher<E, Meta, Context>) new EventPublisher<E, Meta, Context>() { // from class: fr.maif.akka.eventsourcing.InMemoryEventStore.1
            public CompletionStage<Tuple0> publish(io.vavr.collection.List<EventEnvelope<E, Meta, Context>> list) {
                return this.publish(list);
            }

            public void close() throws IOException {
            }
        };
    }

    public /* bridge */ /* synthetic */ CompletionStage commitOrRollback(Option option, Object obj) {
        return commitOrRollback((Option<Throwable>) option, (Tuple0) obj);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 219439648:
                if (implMethodName.equals("lambda$loadEventsByQuery$a5085816$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("fr/maif/akka/eventsourcing/InMemoryEventStore") && serializedLambda.getImplMethodSignature().equals("(Lfr/maif/eventsourcing/EventStore$Query;Lfr/maif/eventsourcing/EventEnvelope;)Z")) {
                    EventStore.Query query = (EventStore.Query) serializedLambda.getCapturedArg(0);
                    return eventEnvelope -> {
                        return ((Boolean) Option.of(query.entityId).map(str -> {
                            return Boolean.valueOf(str.equals(eventEnvelope.entityId));
                        }).getOrElse(true)).booleanValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
