package org.occurrent.subscription.reactor.durable;

import io.cloudevents.CloudEvent;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import org.occurrent.subscription.PositionAwareCloudEvent;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.api.reactor.PositionAwareSubscriptionModel;
import org.occurrent.subscription.api.reactor.SubscriptionPositionStorage;
import org.occurrent.subscription.util.predicate.EveryN;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/occurrent/subscription/reactor/durable/ReactorDurableSubscriptionModel.class */
public class ReactorDurableSubscriptionModel {
    private static final Logger log = LoggerFactory.getLogger(ReactorDurableSubscriptionModel.class);
    private final PositionAwareSubscriptionModel subscription;
    private final SubscriptionPositionStorage storage;
    private final ReactorDurableSubscriptionModelConfig config;

    public ReactorDurableSubscriptionModel(PositionAwareSubscriptionModel positionAwareSubscriptionModel, SubscriptionPositionStorage subscriptionPositionStorage) {
        this(positionAwareSubscriptionModel, subscriptionPositionStorage, new ReactorDurableSubscriptionModelConfig((Predicate<CloudEvent>) EveryN.everyEvent()));
    }

    public ReactorDurableSubscriptionModel(PositionAwareSubscriptionModel positionAwareSubscriptionModel, SubscriptionPositionStorage subscriptionPositionStorage, ReactorDurableSubscriptionModelConfig reactorDurableSubscriptionModelConfig) {
        Objects.requireNonNull(positionAwareSubscriptionModel, PositionAwareSubscriptionModel.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(subscriptionPositionStorage, SubscriptionPositionStorage.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(reactorDurableSubscriptionModelConfig, ReactorDurableSubscriptionModelConfig.class.getSimpleName() + " cannot be null");
        this.subscription = positionAwareSubscriptionModel;
        this.storage = subscriptionPositionStorage;
        this.config = reactorDurableSubscriptionModelConfig;
    }

    public Mono<Void> subscribe(String str, Function<CloudEvent, Mono<Void>> function) {
        return subscribe(str, null, function);
    }

    public Mono<Void> subscribe(String str, SubscriptionFilter subscriptionFilter, Function<CloudEvent, Mono<Void>> function) {
        Objects.requireNonNull(str, "Subscription id cannot be null");
        return findStartAtForSubscription(str).doOnNext(startAt -> {
            log.info("Starting subscription {} from subscription position {}", str, startAt.toString());
        }).flatMapMany(startAt2 -> {
            return this.subscription.subscribe(subscriptionFilter, startAt2);
        }).flatMap(cloudEvent -> {
            return ((Mono) function.apply(cloudEvent)).thenReturn(cloudEvent);
        }).filter(this.config.persistCloudEventPositionPredicate).flatMap(cloudEvent2 -> {
            return this.storage.save(str, PositionAwareCloudEvent.getSubscriptionPositionOrThrowIAE(cloudEvent2)).thenReturn(cloudEvent2);
        }).then();
    }

    public Mono<StartAt> findStartAtForSubscription(String str) {
        Objects.requireNonNull(str, "Subscription id cannot be null");
        return this.storage.read(str).doOnNext(subscriptionPosition -> {
            log.info("Found subscription position: {}", subscriptionPosition);
        }).switchIfEmpty(Mono.defer(() -> {
            log.info("No subscription position found for {}, will initialize a new one.", str);
            return this.subscription.globalSubscriptionPosition().flatMap(subscriptionPosition2 -> {
                return this.storage.save(str, subscriptionPosition2);
            });
        })).map(StartAt::subscriptionPosition);
    }
}
