package org.axonframework.eventsourcing.eventstore;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/AbstractEventStore.class */
public abstract class AbstractEventStore extends AbstractEventBus implements EventStore {
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedEventStore.class);
    private final EventStorageEngine storageEngine;

    protected AbstractEventStore(EventStorageEngine eventStorageEngine) {
        this(eventStorageEngine, NoOpMessageMonitor.INSTANCE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEventStore(EventStorageEngine eventStorageEngine, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        super(messageMonitor);
        this.storageEngine = eventStorageEngine;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.axonframework.eventhandling.AbstractEventBus
    public void prepareCommit(List<? extends EventMessage<?>> list) {
        this.storageEngine.appendEvents(list);
        super.prepareCommit(list);
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public DomainEventStream readEvents(String str) {
        Optional<DomainEventMessage<?>> empty;
        DomainEventStream readEvents;
        try {
            empty = this.storageEngine.readSnapshot(str);
        } catch (Exception | LinkageError e) {
            logger.warn("Error reading snapshot. Reconstructing aggregate from entire event stream. Caused by: {} {}", e.getClass().getName(), e.getMessage());
            empty = Optional.empty();
        }
        if (empty.isPresent()) {
            DomainEventMessage<?> domainEventMessage = empty.get();
            readEvents = DomainEventStream.concat(DomainEventStream.of(domainEventMessage), this.storageEngine.readEvents(str, domainEventMessage.getSequenceNumber() + 1));
        } else {
            readEvents = this.storageEngine.readEvents(str);
        }
        return DomainEventStream.concat(readEvents, DomainEventStream.of(stagedDomainEventMessages(str)));
    }

    protected Stream<? extends DomainEventMessage<?>> stagedDomainEventMessages(String str) {
        return queuedMessages().stream().filter(eventMessage -> {
            return eventMessage instanceof DomainEventMessage;
        }).map(eventMessage2 -> {
            return (DomainEventMessage) eventMessage2;
        }).filter(domainEventMessage -> {
            return str.equals(domainEventMessage.getAggregateIdentifier());
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public DomainEventStream readEvents(String str, long j) {
        return DomainEventStream.concat(this.storageEngine.readEvents(str, j), DomainEventStream.of(stagedDomainEventMessages(str).filter(domainEventMessage -> {
            return domainEventMessage.getSequenceNumber() >= j;
        })));
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        this.storageEngine.storeSnapshot(domainEventMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventStorageEngine storageEngine() {
        return this.storageEngine;
    }
}
