package org.axonframework.mongo.eventsourcing.eventstore;

import com.mongodb.BasicDBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.eventsourcing.eventstore.legacy.LegacyTrackingToken;
import org.axonframework.mongo.eventsourcing.eventstore.documentperevent.EventEntryConfiguration;
import org.axonframework.serialization.Serializer;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:org/axonframework/mongo/eventsourcing/eventstore/AbstractMongoEventStorageStrategy.class */
public abstract class AbstractMongoEventStorageStrategy implements StorageStrategy {
    protected static final int ORDER_ASC = 1;
    protected static final int ORDER_DESC = -1;
    private final EventEntryConfiguration eventConfiguration;

    public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventEntryConfiguration) {
        this.eventConfiguration = eventEntryConfiguration;
    }

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public void appendEvents(MongoCollection<Document> mongoCollection, List<? extends EventMessage<?>> list, Serializer serializer) {
        mongoCollection.insertMany((List) createEventDocuments(list, serializer).collect(Collectors.toList()));
    }

    protected abstract Stream<Document> createEventDocuments(List<? extends EventMessage<?>> list, Serializer serializer);

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public void appendSnapshot(MongoCollection<Document> mongoCollection, DomainEventMessage<?> domainEventMessage, Serializer serializer) {
        mongoCollection.insertOne(createSnapshotDocument(domainEventMessage, serializer));
    }

    protected abstract Document createSnapshotDocument(DomainEventMessage<?> domainEventMessage, Serializer serializer);

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public void deleteSnapshots(MongoCollection<Document> mongoCollection, String str) {
        mongoCollection.deleteMany(new BsonDocument(this.eventConfiguration.aggregateIdentifierProperty(), new BsonString(str)));
    }

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public List<? extends DomainEventData<?>> findDomainEvents(MongoCollection<Document> mongoCollection, String str, long j, int i) {
        return (List) StreamSupport.stream(applyBatchSize(mongoCollection.find(Filters.and(new Bson[]{Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str), Filters.gte(this.eventConfiguration.sequenceNumberProperty(), Long.valueOf(j))})).sort(new BasicDBObject(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))), i).spliterator(), false).flatMap(this::extractDomainEvents).filter(domainEventData -> {
            return domainEventData.getSequenceNumber() >= j;
        }).collect(Collectors.toList());
    }

    protected abstract Stream<? extends DomainEventData<?>> extractDomainEvents(Document document);

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public List<? extends TrackedEventData<?>> findTrackedEvents(MongoCollection<Document> mongoCollection, TrackingToken trackingToken, int i) {
        FindIterable find;
        if (trackingToken == null) {
            find = mongoCollection.find();
        } else {
            Assert.isTrue(trackingToken instanceof LegacyTrackingToken, () -> {
                return String.format("Token %s is of the wrong type", trackingToken);
            });
            LegacyTrackingToken legacyTrackingToken = (LegacyTrackingToken) trackingToken;
            find = mongoCollection.find(Filters.and(new Bson[]{Filters.gte(this.eventConfiguration.timestampProperty(), legacyTrackingToken.getTimestamp().toString()), Filters.gte(this.eventConfiguration.sequenceNumberProperty(), Long.valueOf(legacyTrackingToken.getSequenceNumber()))}));
        }
        return (List) StreamSupport.stream(applyBatchSize(find.sort(new BasicDBObject(eventConfiguration().timestampProperty(), Integer.valueOf(ORDER_ASC)).append(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))), i).spliterator(), false).flatMap(this::extractTrackedEvents).filter(trackedEventData -> {
            return trackedEventData.trackingToken().isAfter(trackingToken);
        }).limit(i).collect(Collectors.toList());
    }

    protected abstract FindIterable<Document> applyBatchSize(FindIterable<Document> findIterable, int i);

    protected abstract Stream<? extends TrackedEventData<?>> extractTrackedEvents(Document document);

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public Optional<? extends DomainEventData<?>> findLastSnapshot(MongoCollection<Document> mongoCollection, String str) {
        return StreamSupport.stream(mongoCollection.find(Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str)).sort(new BasicDBObject(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_DESC))).limit(ORDER_ASC).spliterator(), false).findFirst().map(this::extractSnapshot);
    }

    protected abstract DomainEventData<?> extractSnapshot(Document document);

    @Override // org.axonframework.mongo.eventsourcing.eventstore.StorageStrategy
    public void ensureIndexes(MongoCollection<Document> mongoCollection, MongoCollection<Document> mongoCollection2) {
        mongoCollection.createIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
        mongoCollection.createIndex(new BasicDBObject(this.eventConfiguration.timestampProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(false).name("orderedEventStreamIndex"));
        mongoCollection2.createIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventEntryConfiguration eventConfiguration() {
        return this.eventConfiguration;
    }
}
