package org.occurrent.subscription.mongodb.spring.reactor;

import io.cloudevents.CloudEvent;
import java.util.Objects;
import org.bson.BsonValue;
import org.bson.Document;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import org.occurrent.subscription.PositionAwareCloudEvent;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.reactor.PositionAwareReactorSubscription;
import org.occurrent.subscription.mongodb.MongoDBOperationTimeBasedSubscriptionPosition;
import org.occurrent.subscription.mongodb.MongoDBResumeTokenBasedSubscriptionPosition;
import org.occurrent.subscription.mongodb.internal.MongoDBCloudEventsToJsonDeserializer;
import org.occurrent.subscription.mongodb.internal.MongoDBCommons;
import org.occurrent.subscription.mongodb.spring.internal.ApplyFilterToChangeStreamOptionsBuilder;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/occurrent/subscription/mongodb/spring/reactor/SpringReactorSubscriptionForMongoDB.class */
public class SpringReactorSubscriptionForMongoDB implements PositionAwareReactorSubscription {
    private final ReactiveMongoOperations mongo;
    private final String eventCollection;
    private final TimeRepresentation timeRepresentation;

    public SpringReactorSubscriptionForMongoDB(ReactiveMongoOperations reactiveMongoOperations, String str, TimeRepresentation timeRepresentation) {
        this.mongo = reactiveMongoOperations;
        this.eventCollection = str;
        this.timeRepresentation = timeRepresentation;
    }

    public Flux<CloudEvent> subscribe(SubscriptionFilter subscriptionFilter, StartAt startAt) {
        return this.mongo.changeStream(this.eventCollection, ApplyFilterToChangeStreamOptionsBuilder.applyFilter(this.timeRepresentation, subscriptionFilter, (ChangeStreamOptions.ChangeStreamOptionsBuilder) MongoDBCommons.applyStartPosition(ChangeStreamOptions.builder(), (v0, v1) -> {
            return v0.startAfter(v1);
        }, (v0, v1) -> {
            return v0.resumeAt(v1);
        }, startAt)), Document.class).flatMap(changeStreamEvent -> {
            return (Mono) MongoDBCloudEventsToJsonDeserializer.deserializeToCloudEvent(changeStreamEvent.getRaw(), this.timeRepresentation).map(cloudEvent -> {
                return new PositionAwareCloudEvent(cloudEvent, new MongoDBResumeTokenBasedSubscriptionPosition(((BsonValue) Objects.requireNonNull(changeStreamEvent.getResumeToken())).asDocument()));
            }).map((v0) -> {
                return Mono.just(v0);
            }).orElse(Mono.empty());
        });
    }

    public Mono<SubscriptionPosition> globalSubscriptionPosition() {
        return this.mongo.executeCommand(new Document("hostInfo", 1)).map(MongoDBCommons::getServerOperationTime).map(MongoDBOperationTimeBasedSubscriptionPosition::new);
    }
}
