package org.axonframework.eventsourcing.eventstore.inmemory;

import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackingToken;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/inmemory/InMemoryEventStorageEngine.class */
public class InMemoryEventStorageEngine implements EventStorageEngine {
    private final NavigableMap<TrackingToken, TrackedEventMessage<?>> events = new ConcurrentSkipListMap();
    private final Map<String, DomainEventMessage<?>> snapshots = new ConcurrentHashMap();

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public void appendEvents(List<? extends EventMessage<?>> list) {
        synchronized (this.events) {
            GlobalSequenceTrackingToken nextTrackingToken = nextTrackingToken();
            this.events.putAll((Map) IntStream.range(0, list.size()).mapToObj(i -> {
                return EventUtils.asTrackedEventMessage((EventMessage) list.get(i), nextTrackingToken.offsetBy(i));
            }).collect(Collectors.toMap((v0) -> {
                return v0.trackingToken();
            }, Function.identity())));
        }
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        this.snapshots.put(domainEventMessage.getAggregateIdentifier(), domainEventMessage);
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public Stream<? extends TrackedEventMessage<?>> readEvents(TrackingToken trackingToken, boolean z) {
        return trackingToken == null ? this.events.values().stream() : this.events.tailMap(trackingToken, false).values().stream();
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public DomainEventStream readEvents(String str, long j) {
        AtomicReference atomicReference = new AtomicReference();
        Stream peek = this.events.values().stream().filter(trackedEventMessage -> {
            return trackedEventMessage instanceof DomainEventMessage;
        }).map(trackedEventMessage2 -> {
            return (DomainEventMessage) trackedEventMessage2;
        }).filter(domainEventMessage -> {
            return str.equals(domainEventMessage.getAggregateIdentifier()) && domainEventMessage.getSequenceNumber() >= j;
        }).peek(domainEventMessage2 -> {
            atomicReference.set(Long.valueOf(domainEventMessage2.getSequenceNumber()));
        });
        atomicReference.getClass();
        return DomainEventStream.of(peek, atomicReference::get);
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public Optional<DomainEventMessage<?>> readSnapshot(String str) {
        return Optional.ofNullable(this.snapshots.get(str));
    }

    protected GlobalSequenceTrackingToken nextTrackingToken() {
        return this.events.isEmpty() ? new GlobalSequenceTrackingToken(0L) : ((GlobalSequenceTrackingToken) this.events.lastKey()).next();
    }
}
