package akka.contrib.persistence.mongodb;

import akka.NotUsed;
import akka.persistence.query.Offset;
import akka.stream.Materializer;
import akka.stream.SharedKillSwitch;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import reactivemongo.akkastream.AkkaStreamCursor;
import reactivemongo.api.QueryOpts;
import reactivemongo.api.QueryOpts$;
import reactivemongo.api.collections.GenericQueryBuilder;
import reactivemongo.bson.BSONDocument;
import reactivemongo.bson.BSONDocument$;
import reactivemongo.bson.BSONElement$;
import reactivemongo.bson.BSONObjectID;
import reactivemongo.bson.Producer;
import reactivemongo.bson.package$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;

/* compiled from: RxMongoReadJournaller.scala */
@ScalaSignature(bytes = "\u0006\u0001m3A!\u0001\u0002\u0001\u0017\t!\"\u000b_'p]\u001e|'j\\;s]\u0006d7\u000b\u001e:fC6T!a\u0001\u0003\u0002\u000f5|gnZ8eE*\u0011QAB\u0001\fa\u0016\u00148/[:uK:\u001cWM\u0003\u0002\b\u0011\u000591m\u001c8ue&\u0014'\"A\u0005\u0002\t\u0005\\7.Y\u0002\u0001'\r\u0001AB\u0005\t\u0003\u001bAi\u0011A\u0004\u0006\u0002\u001f\u0005)1oY1mC&\u0011\u0011C\u0004\u0002\u0007\u0003:L(+\u001a4\u0011\u0007M!b#D\u0001\u0003\u0013\t)\"AA\u0007K_V\u0014h.\u00197TiJ,\u0017-\u001c\t\u0005/qq2&D\u0001\u0019\u0015\tI\"$\u0001\u0005tG\u0006d\u0017\rZ:m\u0015\tY\u0002\"\u0001\u0004tiJ,\u0017-\\\u0005\u0003;a\u0011aaU8ve\u000e,\u0007\u0003B\u0007 C\u0011J!\u0001\t\b\u0003\rQ+\b\u000f\\33!\t\u0019\"%\u0003\u0002$\u0005\t)QI^3oiB\u0011Q%K\u0007\u0002M)\u0011q\u0005K\u0001\u0006cV,'/\u001f\u0006\u0003\u000b!I!A\u000b\u0014\u0003\r=3gm]3u!\taS&D\u0001\t\u0013\tq\u0003BA\u0004O_R,6/\u001a3\t\u0011A\u0002!\u0011!Q\u0001\nE\na\u0001\u001a:jm\u0016\u0014\bCA\n3\u0013\t\u0019$AA\u0007Sq6{gnZ8Ee&4XM\u001d\u0005\tk\u0001\u0011\t\u0011)A\u0006m\u0005\tQ\u000e\u0005\u00028q5\t!$\u0003\u0002:5\taQ*\u0019;fe&\fG.\u001b>fe\")1\b\u0001C\u0001y\u00051A(\u001b8jiz\"\"!\u0010!\u0015\u0005yz\u0004CA\n\u0001\u0011\u0015)$\bq\u00017\u0011\u0015\u0001$\b1\u00012\u0011\u001d\u0011\u0005A1A\u0005\u0004\r\u000b!!Z2\u0016\u0003\u0011\u0003\"!\u0012%\u000e\u0003\u0019S!a\u0012\b\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002J\r\n\u0001R\t_3dkRLwN\\\"p]R,\u0007\u0010\u001e\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002#\u0002\u0007\u0015\u001c\u0007\u0005C\u0003N\u0001\u0011\u0005a*\u0001\u0004dkJ\u001cxN\u001d\u000b\u0003-=CQa\n'A\u0002A\u00032!D)T\u0013\t\u0011fB\u0001\u0004PaRLwN\u001c\t\u0003)fk\u0011!\u0016\u0006\u0003-^\u000bAAY:p]*\t\u0001,A\u0007sK\u0006\u001cG/\u001b<f[>twm\\\u0005\u00035V\u0013ABQ*P\u001d\u0012{7-^7f]R\u0004")
/* loaded from: input_file:akka/contrib/persistence/mongodb/RxMongoJournalStream.class */
public class RxMongoJournalStream implements JournalStream<Source<Tuple2<Event, Offset>, NotUsed>> {
    public final RxMongoDriver akka$contrib$persistence$mongodb$RxMongoJournalStream$$driver;
    private final Materializer m;
    private final ExecutionContext ec;
    private final SharedKillSwitch killSwitch;

    public void stopAllStreams() {
        JournalStream.stopAllStreams$(this);
    }

    public SharedKillSwitch killSwitch() {
        return this.killSwitch;
    }

    public void akka$contrib$persistence$mongodb$JournalStream$_setter_$killSwitch_$eq(SharedKillSwitch sharedKillSwitch) {
        this.killSwitch = sharedKillSwitch;
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    public Source<Tuple2<Event, Offset>, NotUsed> cursor(Option<BSONDocument> option) {
        return this.akka$contrib$persistence$mongodb$RxMongoJournalStream$$driver.realtimeEnablePersistence() ? Source$.MODULE$.fromFuture((Future) this.akka$contrib$persistence$mongodb$RxMongoJournalStream$$driver.realtime(ec())).flatMapConcat(bSONCollection -> {
            return Source$.MODULE$.fromGraph(new RxMongoRealtimeGraphStage(this.akka$contrib$persistence$mongodb$RxMongoJournalStream$$driver, RxMongoRealtimeGraphStage$.MODULE$.$lessinit$greater$default$2(), option2 -> {
                GenericQueryBuilder find;
                Tuple2 tuple2 = new Tuple2(option, option2);
                if (tuple2 != null) {
                    Option option2 = (Option) tuple2._1();
                    Option option3 = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option2) && None$.MODULE$.equals(option3)) {
                        find = bSONCollection.find(BSONDocument$.MODULE$.empty(), package$.MODULE$.BSONDocumentIdentity());
                        GenericQueryBuilder options = find.options(new QueryOpts(QueryOpts$.MODULE$.apply$default$1(), QueryOpts$.MODULE$.apply$default$2(), QueryOpts$.MODULE$.apply$default$3()).tailable().awaitData());
                        AkkaStreamCursor cursor = options.cursor(options.cursor$default$1(), options.cursor$default$2(), package$.MODULE$.BSONDocumentIdentity(), reactivemongo.akkastream.package$.MODULE$.cursorProducer());
                        return cursor.documentPublisher(cursor.documentPublisher$default$1(), cursor.documentPublisher$default$2(), cursor.documentPublisher$default$3(), this.m);
                    }
                }
                if (tuple2 != null) {
                    Option option4 = (Option) tuple2._1();
                    Some some = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option4) && (some instanceof Some)) {
                        find = bSONCollection.find(BSONDocument$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Producer[]{BSONElement$.MODULE$.provided(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_id"), BSONDocument$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Producer[]{BSONElement$.MODULE$.provided(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("$gt"), (BSONObjectID) some.value()))}))))})), package$.MODULE$.BSONDocumentIdentity());
                        GenericQueryBuilder options2 = find.options(new QueryOpts(QueryOpts$.MODULE$.apply$default$1(), QueryOpts$.MODULE$.apply$default$2(), QueryOpts$.MODULE$.apply$default$3()).tailable().awaitData());
                        AkkaStreamCursor cursor2 = options2.cursor(options2.cursor$default$1(), options2.cursor$default$2(), package$.MODULE$.BSONDocumentIdentity(), reactivemongo.akkastream.package$.MODULE$.cursorProducer());
                        return cursor2.documentPublisher(cursor2.documentPublisher$default$1(), cursor2.documentPublisher$default$2(), cursor2.documentPublisher$default$3(), this.m);
                    }
                }
                if (tuple2 != null) {
                    Some some2 = (Option) tuple2._1();
                    Option option5 = (Option) tuple2._2();
                    if (some2 instanceof Some) {
                        BSONDocument bSONDocument = (BSONDocument) some2.value();
                        if (None$.MODULE$.equals(option5)) {
                            find = bSONCollection.find(bSONDocument, package$.MODULE$.BSONDocumentIdentity());
                            GenericQueryBuilder options22 = find.options(new QueryOpts(QueryOpts$.MODULE$.apply$default$1(), QueryOpts$.MODULE$.apply$default$2(), QueryOpts$.MODULE$.apply$default$3()).tailable().awaitData());
                            AkkaStreamCursor cursor22 = options22.cursor(options22.cursor$default$1(), options22.cursor$default$2(), package$.MODULE$.BSONDocumentIdentity(), reactivemongo.akkastream.package$.MODULE$.cursorProducer());
                            return cursor22.documentPublisher(cursor22.documentPublisher$default$1(), cursor22.documentPublisher$default$2(), cursor22.documentPublisher$default$3(), this.m);
                        }
                    }
                }
                if (tuple2 != null) {
                    Some some3 = (Option) tuple2._1();
                    Some some4 = (Option) tuple2._2();
                    if (some3 instanceof Some) {
                        BSONDocument bSONDocument2 = (BSONDocument) some3.value();
                        if (some4 instanceof Some) {
                            find = bSONCollection.find(bSONDocument2.$plus$plus(BSONDocument$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Producer[]{BSONElement$.MODULE$.provided(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("_id"), BSONDocument$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Producer[]{BSONElement$.MODULE$.provided(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("$gt"), (BSONObjectID) some4.value()))}))))}))), package$.MODULE$.BSONDocumentIdentity());
                            GenericQueryBuilder options222 = find.options(new QueryOpts(QueryOpts$.MODULE$.apply$default$1(), QueryOpts$.MODULE$.apply$default$2(), QueryOpts$.MODULE$.apply$default$3()).tailable().awaitData());
                            AkkaStreamCursor cursor222 = options222.cursor(options222.cursor$default$1(), options222.cursor$default$2(), package$.MODULE$.BSONDocumentIdentity(), reactivemongo.akkastream.package$.MODULE$.cursorProducer());
                            return cursor222.documentPublisher(cursor222.documentPublisher$default$1(), cursor222.documentPublisher$default$2(), cursor222.documentPublisher$default$3(), this.m);
                        }
                    }
                }
                throw new MatchError(tuple2);
            })).via(this.killSwitch().flow()).mapConcat(bSONDocument -> {
                BSONObjectID bSONObjectID = (BSONObjectID) bSONDocument.getAs("_id", package$.MODULE$.BSONObjectIDIdentity()).get();
                return (List) bSONDocument.getAs("events", package$.MODULE$.BSONArrayIdentity()).map(bSONArray -> {
                    return (List) ((List) bSONArray.elements().map(bSONElement -> {
                        return bSONElement.value();
                    }, List$.MODULE$.canBuildFrom())).collect(new RxMongoJournalStream$$anonfun$$nestedInanonfun$cursor$4$1(this, bSONObjectID), List$.MODULE$.canBuildFrom());
                }).getOrElse(() -> {
                    return Nil$.MODULE$;
                });
            });
        }) : Source$.MODULE$.empty();
    }

    public RxMongoJournalStream(RxMongoDriver rxMongoDriver, Materializer materializer) {
        this.akka$contrib$persistence$mongodb$RxMongoJournalStream$$driver = rxMongoDriver;
        this.m = materializer;
        JournalStream.$init$(this);
        this.ec = rxMongoDriver.querySideDispatcher();
    }
}
