package pekko.contrib.persistence.mongodb.driver;

import com.mongodb.CursorType;
import org.apache.pekko.NotUsed;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.stream.KillSwitches$;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.bson.BsonDocument;
import org.bson.BsonObjectId;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.mongodb.scala.FindObservable;
import org.mongodb.scala.bson.DefaultHelper$DefaultsTo$;
import org.mongodb.scala.model.Filters$;
import pekko.contrib.persistence.mongodb.Event;
import pekko.contrib.persistence.mongodb.JournalStream;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.List;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

/* compiled from: ScalaDriverPersistenceReadJournaller.scala */
/* loaded from: input_file:pekko/contrib/persistence/mongodb/driver/ScalaDriverJournalStream.class */
public class ScalaDriverJournalStream implements JournalStream<Source<Tuple2<Event, Offset>, NotUsed>> {
    private SharedKillSwitch killSwitch;
    public final ScalaMongoDriver pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver;
    private final ExecutionContext ec;
    private final Function1<FindObservable<BsonDocument>, FindObservable<BsonDocument>> cursorBuilder;

    public ScalaDriverJournalStream(ScalaMongoDriver scalaMongoDriver) {
        this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver = scalaMongoDriver;
        pekko$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(KillSwitches$.MODULE$.shared("realtimeKillSwitch"));
        this.ec = scalaMongoDriver.querySideDispatcher();
        this.cursorBuilder = findObservable -> {
            return findObservable.cursorType(CursorType.TailableAwait).maxAwaitTime(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
        };
        Statics.releaseFence();
    }

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public SharedKillSwitch killSwitch() {
        return this.killSwitch;
    }

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public void pekko$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(SharedKillSwitch sharedKillSwitch) {
        this.killSwitch = sharedKillSwitch;
    }

    @Override // pekko.contrib.persistence.mongodb.JournalStream
    public /* bridge */ /* synthetic */ void stopAllStreams() {
        stopAllStreams();
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    public Source<Tuple2<Event, Offset>, NotUsed> cursor(Option<Bson> option) {
        return this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver.realtimeEnablePersistence() ? Source$.MODULE$.future(this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver.realtime()).flatMapConcat(mongoCollection -> {
            return Source$.MODULE$.fromGraph(new ScalaDriverRealtimeGraphStage(this.pekko$contrib$persistence$mongodb$driver$ScalaDriverJournalStream$$driver, ScalaDriverRealtimeGraphStage$.MODULE$.$lessinit$greater$default$2(), option2 -> {
                Tuple2 apply = Tuple2$.MODULE$.apply(option, option2);
                if (apply != null) {
                    Some some = (Option) apply._1();
                    Some some2 = (Option) apply._2();
                    if (some instanceof Some) {
                        Bson bson = (Bson) some.value();
                        if (None$.MODULE$.equals(some2)) {
                            return (FindObservable) this.cursorBuilder.apply(mongoCollection.find(bson, DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                        if (some2 instanceof Some) {
                            return (FindObservable) this.cursorBuilder.apply(mongoCollection.find(Filters$.MODULE$.and(ScalaRunTime$.MODULE$.wrapRefArray(new Bson[]{bson, Filters$.MODULE$.gte("_id", (BsonObjectId) some2.value())})), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                    }
                    if (None$.MODULE$.equals(some)) {
                        if (None$.MODULE$.equals(some2)) {
                            return (FindObservable) this.cursorBuilder.apply(mongoCollection.find(DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                        if (some2 instanceof Some) {
                            return (FindObservable) this.cursorBuilder.apply(mongoCollection.find(Filters$.MODULE$.gte("_id", (BsonObjectId) some2.value()), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(BsonDocument.class)));
                        }
                    }
                }
                throw new MatchError(apply);
            }).named("rt-graph-stage").async()).via(killSwitch().flow()).mapConcat(bsonDocument -> {
                ObjectId value = bsonDocument.getObjectId("_id").getValue();
                return (IterableOnce) Option$.MODULE$.apply(bsonDocument.get("events")).filter(bsonValue -> {
                    return bsonValue.isArray();
                }).map(bsonValue2 -> {
                    return bsonValue2.asArray();
                }).map(bsonArray -> {
                    return ((IterableOnceOps) ((IterableOps) JavaConverters$.MODULE$.asScalaBufferConverter(bsonArray.getValues()).asScala()).collect(new ScalaDriverJournalStream$$anon$7(value, this))).toList();
                }).getOrElse(ScalaDriverJournalStream::cursor$$anonfun$1$$anonfun$2$$anonfun$4);
            });
        }).named("rt-cursor-source") : Source$.MODULE$.empty();
    }

    private static final List cursor$$anonfun$1$$anonfun$2$$anonfun$4() {
        return scala.package$.MODULE$.Nil();
    }
}
