package pekko.contrib.persistence.mongodb.driver;

import org.apache.pekko.NotUsed;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.scaladsl.Source;
import org.mongodb.scala.model.Filters$;
import pekko.contrib.persistence.mongodb.Event;
import pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi;
import scala.MatchError;
import scala.None$;
import scala.Option$;
import scala.PartialFunction$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.concurrent.ExecutionContext;
import scala.math.Ordering;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;

/* compiled from: ScalaDriverPersistenceReadJournaller.scala */
/* loaded from: input_file:pekko/contrib/persistence/mongodb/driver/ScalaDriverPersistenceReadJournaller.class */
public class ScalaDriverPersistenceReadJournaller implements MongoPersistenceReadJournallingApi {
    private final ScalaMongoDriver driver;
    private final ScalaDriverJournalStream journalStream;

    public ScalaDriverPersistenceReadJournaller(ScalaMongoDriver scalaMongoDriver) {
        this.driver = scalaMongoDriver;
        ScalaDriverJournalStream scalaDriverJournalStream = new ScalaDriverJournalStream(scalaMongoDriver);
        scalaMongoDriver.actorSystem().registerOnTermination(() -> {
            scalaDriverJournalStream.stopAllStreams();
            return BoxedUnit.UNIT;
        });
        this.journalStream = scalaDriverJournalStream;
    }

    public ScalaDriverJournalStream journalStream() {
        return this.journalStream;
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Event, NotUsed> currentAllEvents(Materializer materializer, ExecutionContext executionContext) {
        return CurrentAllEvents$.MODULE$.source(this.driver);
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<String, NotUsed> currentPersistenceIds(Materializer materializer, ExecutionContext executionContext) {
        return CurrentPersistenceIds$.MODULE$.source(this.driver);
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Event, NotUsed> currentEventsByPersistenceId(String str, long j, long j2, Materializer materializer, ExecutionContext executionContext) {
        return CurrentEventsByPersistenceId$.MODULE$.source(this.driver, str, j, j2);
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Tuple2<Event, Offset>, NotUsed> currentEventsByTag(String str, Offset offset, Materializer materializer, ExecutionContext executionContext) {
        return CurrentEventsByTag$.MODULE$.source(this.driver, str, offset);
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public boolean checkOffsetIsSupported(Offset offset) {
        return PartialFunction$.MODULE$.cond(offset, new ScalaDriverPersistenceReadJournaller$$anon$8());
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Event, NotUsed> liveEvents(Materializer materializer, ExecutionContext executionContext) {
        return journalStream().cursor(None$.MODULE$).map(tuple2 -> {
            if (tuple2 != null) {
                return (Event) tuple2._1();
            }
            throw new MatchError(tuple2);
        });
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<String, NotUsed> livePersistenceIds(Materializer materializer, ExecutionContext executionContext) {
        return journalStream().cursor(None$.MODULE$).map(tuple2 -> {
            if (tuple2 != null) {
                return ((Event) tuple2._1()).pid();
            }
            throw new MatchError(tuple2);
        });
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Event, NotUsed> liveEventsByPersistenceId(String str, Materializer materializer, ExecutionContext executionContext) {
        return journalStream().cursor(Option$.MODULE$.apply(Filters$.MODULE$.equal("pid", str))).mapConcat(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return ((List) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Event[]{(Event) tuple2._1()}))).filter(event -> {
                String pid = event.pid();
                return pid != null ? pid.equals(str) : str == null;
            });
        });
    }

    @Override // pekko.contrib.persistence.mongodb.MongoPersistenceReadJournallingApi
    public Source<Tuple2<Event, Offset>, NotUsed> liveEventsByTag(String str, Offset offset, Materializer materializer, ExecutionContext executionContext, Ordering<Offset> ordering) {
        return journalStream().cursor(Option$.MODULE$.apply(Filters$.MODULE$.equal("_tg", str))).filter(tuple2 -> {
            if (tuple2 != null) {
                return ((Event) tuple2._1()).tags().contains(str) && ordering.gt((Offset) tuple2._2(), offset);
            }
            throw new MatchError(tuple2);
        });
    }
}
