package akka.contrib.persistence.mongodb;

import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactivemongo.api.bson.BSONDocument;
import reactivemongo.api.bson.BSONObjectID;
import reactivemongo.api.bson.package$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;

/* compiled from: RxMongoReadJournaller.scala */
/* loaded from: input_file:akka/contrib/persistence/mongodb/RxMongoRealtimeGraphStage$$anon$1.class */
public final class RxMongoRealtimeGraphStage$$anon$1 extends GraphStageLogic {
    private Option<BSONObjectID> lastId;
    private Option<Subscription> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription;
    private Option<Publisher<BSONDocument>> cursor;
    private List<BSONDocument> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer;
    private final /* synthetic */ RxMongoRealtimeGraphStage $outer;

    private Option<BSONObjectID> lastId() {
        return this.lastId;
    }

    private void lastId_$eq(Option<BSONObjectID> option) {
        this.lastId = option;
    }

    public Option<Subscription> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription() {
        return this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription;
    }

    private void akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription_$eq(Option<Subscription> option) {
        this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription = option;
    }

    private Option<Publisher<BSONDocument>> cursor() {
        return this.cursor;
    }

    private void cursor_$eq(Option<Publisher<BSONDocument>> option) {
        this.cursor = option;
    }

    public List<BSONDocument> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer() {
        return this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer;
    }

    public void akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer_$eq(List<BSONDocument> list) {
        this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer = list;
    }

    public void preStart() {
        cursor_$eq(Option$.MODULE$.apply(buildCursor(buildSubscriber())));
    }

    public AsyncCallback<Subscription> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subAc() {
        return getAsyncCallback(subscription -> {
            $anonfun$subAc$1(this, subscription);
            return BoxedUnit.UNIT;
        });
    }

    public AsyncCallback<BSONDocument> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$nxtAc() {
        return getAsyncCallback(bSONDocument -> {
            $anonfun$nxtAc$1(this, bSONDocument);
            return BoxedUnit.UNIT;
        });
    }

    public AsyncCallback<Throwable> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$errAc() {
        return getAsyncCallback(th -> {
            this.failStage(th);
            return BoxedUnit.UNIT;
        });
    }

    public AsyncCallback<BoxedUnit> akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$cmpAc() {
        return getAsyncCallback(boxedUnit -> {
            $anonfun$cmpAc$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
    }

    private Subscriber<BSONDocument> buildSubscriber() {
        return new Subscriber<BSONDocument>(this) { // from class: akka.contrib.persistence.mongodb.RxMongoRealtimeGraphStage$$anon$1$$anon$2
            private final AsyncCallback<Subscription> subAcImpl;
            private final AsyncCallback<BSONDocument> nxtAcImpl;
            private final AsyncCallback<Throwable> errAcImpl;
            private final AsyncCallback<BoxedUnit> cmpAcImpl;

            private AsyncCallback<Subscription> subAcImpl() {
                return this.subAcImpl;
            }

            private AsyncCallback<BSONDocument> nxtAcImpl() {
                return this.nxtAcImpl;
            }

            private AsyncCallback<Throwable> errAcImpl() {
                return this.errAcImpl;
            }

            private AsyncCallback<BoxedUnit> cmpAcImpl() {
                return this.cmpAcImpl;
            }

            public void onSubscribe(Subscription subscription) {
                subAcImpl().invoke(subscription);
            }

            public void onNext(BSONDocument bSONDocument) {
                nxtAcImpl().invoke(bSONDocument);
            }

            public void onError(Throwable th) {
                errAcImpl().invoke(th);
            }

            public void onComplete() {
                cmpAcImpl().invoke(BoxedUnit.UNIT);
            }

            {
                this.subAcImpl = this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subAc();
                this.nxtAcImpl = this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$nxtAc();
                this.errAcImpl = this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$errAc();
                this.cmpAcImpl = this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$cmpAc();
            }
        };
    }

    private Publisher<BSONDocument> buildCursor(Subscriber<BSONDocument> subscriber) {
        akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription().foreach(subscription -> {
            subscription.cancel();
            return BoxedUnit.UNIT;
        });
        Publisher<BSONDocument> publisher = (Publisher) this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$factory.apply(lastId());
        publisher.subscribe(subscriber);
        return publisher;
    }

    public /* synthetic */ RxMongoRealtimeGraphStage akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$subAc$1(RxMongoRealtimeGraphStage$$anon$1 rxMongoRealtimeGraphStage$$anon$1, Subscription subscription) {
        subscription.request(rxMongoRealtimeGraphStage$$anon$1.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$bufsz);
        rxMongoRealtimeGraphStage$$anon$1.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription_$eq(Option$.MODULE$.apply(subscription));
    }

    public static final /* synthetic */ void $anonfun$nxtAc$1(RxMongoRealtimeGraphStage$$anon$1 rxMongoRealtimeGraphStage$$anon$1, BSONDocument bSONDocument) {
        if (rxMongoRealtimeGraphStage$$anon$1.isAvailable(rxMongoRealtimeGraphStage$$anon$1.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$out())) {
            rxMongoRealtimeGraphStage$$anon$1.push(rxMongoRealtimeGraphStage$$anon$1.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$out(), bSONDocument);
            rxMongoRealtimeGraphStage$$anon$1.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription().foreach(subscription -> {
                subscription.request(1L);
                return BoxedUnit.UNIT;
            });
        } else {
            rxMongoRealtimeGraphStage$$anon$1.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer_$eq(new $colon.colon(bSONDocument, Nil$.MODULE$).$colon$colon$colon(rxMongoRealtimeGraphStage$$anon$1.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer()));
        }
        rxMongoRealtimeGraphStage$$anon$1.lastId_$eq(bSONDocument.getAsOpt("_id", package$.MODULE$.BSONObjectIDIdentity()));
    }

    public static final /* synthetic */ void $anonfun$cmpAc$1(RxMongoRealtimeGraphStage$$anon$1 rxMongoRealtimeGraphStage$$anon$1, BoxedUnit boxedUnit) {
        rxMongoRealtimeGraphStage$$anon$1.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription().foreach(subscription -> {
            subscription.cancel();
            return BoxedUnit.UNIT;
        });
        rxMongoRealtimeGraphStage$$anon$1.cursor_$eq(None$.MODULE$);
        rxMongoRealtimeGraphStage$$anon$1.cursor_$eq(Option$.MODULE$.apply(rxMongoRealtimeGraphStage$$anon$1.buildCursor(rxMongoRealtimeGraphStage$$anon$1.buildSubscriber())));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public RxMongoRealtimeGraphStage$$anon$1(RxMongoRealtimeGraphStage rxMongoRealtimeGraphStage) {
        super(rxMongoRealtimeGraphStage.m12shape());
        if (rxMongoRealtimeGraphStage == null) {
            throw null;
        }
        this.$outer = rxMongoRealtimeGraphStage;
        this.lastId = None$.MODULE$;
        this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription = None$.MODULE$;
        this.cursor = None$.MODULE$;
        this.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer = Nil$.MODULE$;
        setHandler(rxMongoRealtimeGraphStage.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$out(), new OutHandler(this) { // from class: akka.contrib.persistence.mongodb.RxMongoRealtimeGraphStage$$anon$1$$anon$3
            private final /* synthetic */ RxMongoRealtimeGraphStage$$anon$1 $outer;

            public void onPull() {
                while (this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer().nonEmpty() && this.$outer.isAvailable(this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$$outer().akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$out())) {
                    $colon.colon akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer = this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer();
                    if (!(akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer instanceof $colon.colon)) {
                        throw new MatchError(akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer);
                    }
                    $colon.colon colonVar = akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer;
                    Tuple2 tuple2 = new Tuple2((BSONDocument) colonVar.head(), colonVar.tl$access$1());
                    BSONDocument bSONDocument = (BSONDocument) tuple2._1();
                    List<BSONDocument> list = (List) tuple2._2();
                    this.$outer.push(this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$$outer().akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$out(), bSONDocument);
                    this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$buffer_$eq(list);
                    this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription().foreach(subscription -> {
                        subscription.request(1L);
                        return BoxedUnit.UNIT;
                    });
                }
            }

            public void onDownstreamFinish() {
                this.$outer.akka$contrib$persistence$mongodb$RxMongoRealtimeGraphStage$$anon$$subscription().foreach(subscription -> {
                    subscription.cancel();
                    return BoxedUnit.UNIT;
                });
                this.$outer.completeStage();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
