package pekko.contrib.persistence.mongodb;

import org.apache.pekko.NotUsed;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.scaladsl.CurrentEventsByTagQuery;
import org.apache.pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery;
import org.apache.pekko.persistence.query.scaladsl.EventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.scaladsl.EventsByTagQuery;
import org.apache.pekko.persistence.query.scaladsl.PersistenceIdsQuery;
import org.apache.pekko.stream.Attributes$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.Source;
import scala.Predef$;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;

/* compiled from: MongoReadJournal.scala */
/* loaded from: input_file:pekko/contrib/persistence/mongodb/ScalaDslMongoReadJournal.class */
public class ScalaDslMongoReadJournal implements CurrentPersistenceIdsQuery, CurrentEventsByPersistenceIdQuery, CurrentEventsByTagQuery, PersistenceIdsQuery, EventsByPersistenceIdQuery, EventsByTagQuery {
    private final MongoPersistenceReadJournallingApi impl;
    private final Materializer m;
    private final ExecutionContext ec;

    /* compiled from: MongoReadJournal.scala */
    /* loaded from: input_file:pekko/contrib/persistence/mongodb/ScalaDslMongoReadJournal$RichFlow.class */
    public static class RichFlow<Mat> {
        private final Source<Event, Mat> source;

        public RichFlow(Source<Event, Mat> source) {
            this.source = source;
        }

        public Source<EventEnvelope, Mat> toEventEnvelopes() {
            return this.source.via(ScalaDslMongoReadJournal$.MODULE$.eventToEventEnvelope());
        }
    }

    /* compiled from: MongoReadJournal.scala */
    /* loaded from: input_file:pekko/contrib/persistence/mongodb/ScalaDslMongoReadJournal$RichFlowWithOffsets.class */
    public static class RichFlowWithOffsets<Mat> {
        private final Source<Tuple2<Event, Offset>, Mat> source;

        public RichFlowWithOffsets(Source<Tuple2<Event, Offset>, Mat> source) {
            this.source = source;
        }

        public Source<EventEnvelope, Mat> toEventEnvelopes() {
            return this.source.via(ScalaDslMongoReadJournal$.MODULE$.eventPlusOffsetToEventEnvelope());
        }
    }

    public static <Mat> RichFlow<Mat> RichFlow(Source<Event, Mat> source) {
        return ScalaDslMongoReadJournal$.MODULE$.RichFlow(source);
    }

    public static <Mat> RichFlowWithOffsets<Mat> RichFlowWithOffsets(Source<Tuple2<Event, Offset>, Mat> source) {
        return ScalaDslMongoReadJournal$.MODULE$.RichFlowWithOffsets(source);
    }

    public static Flow<Tuple2<Event, Offset>, EventEnvelope, NotUsed> eventPlusOffsetToEventEnvelope() {
        return ScalaDslMongoReadJournal$.MODULE$.eventPlusOffsetToEventEnvelope();
    }

    public static Flow<Event, EventEnvelope, NotUsed> eventToEventEnvelope() {
        return ScalaDslMongoReadJournal$.MODULE$.eventToEventEnvelope();
    }

    public ScalaDslMongoReadJournal(MongoPersistenceReadJournallingApi mongoPersistenceReadJournallingApi, Materializer materializer, ExecutionContext executionContext) {
        this.impl = mongoPersistenceReadJournallingApi;
        this.m = materializer;
        this.ec = executionContext;
    }

    public Source<EventEnvelope, NotUsed> currentAllEvents() {
        return ScalaDslMongoReadJournal$.MODULE$.RichFlow(this.impl.currentAllEvents(this.m, this.ec)).toEventEnvelopes();
    }

    public Source<String, NotUsed> currentPersistenceIds() {
        return this.impl.currentPersistenceIds(this.m, this.ec);
    }

    public Source<EventEnvelope, NotUsed> currentEventsByPersistenceId(String str, long j, long j2) {
        Predef$.MODULE$.require(str != null, ScalaDslMongoReadJournal::currentEventsByPersistenceId$$anonfun$1);
        return ScalaDslMongoReadJournal$.MODULE$.RichFlow(this.impl.currentEventsByPersistenceId(str, j, j2, this.m, this.ec)).toEventEnvelopes();
    }

    public Source<EventEnvelope, NotUsed> currentEventsByTag(String str, Offset offset) {
        Predef$.MODULE$.require(str != null, ScalaDslMongoReadJournal::currentEventsByTag$$anonfun$1);
        Predef$.MODULE$.require(this.impl.checkOffsetIsSupported(offset), () -> {
            return currentEventsByTag$$anonfun$2(r2);
        });
        return ScalaDslMongoReadJournal$.MODULE$.RichFlowWithOffsets(this.impl.currentEventsByTag(str, offset, this.m, this.ec)).toEventEnvelopes();
    }

    public Source<EventEnvelope, NotUsed> allEvents() {
        return ScalaDslMongoReadJournal$.MODULE$.RichFlow(this.impl.currentAllEvents(this.m, this.ec).$plus$plus(this.impl.liveEvents(this.m, this.ec)).via(new RemoveDuplicatedEventsByPersistenceId())).toEventEnvelopes();
    }

    public Source<EventEnvelope, NotUsed> eventsByPersistenceId(String str, long j, long j2) {
        Predef$.MODULE$.require(str != null, ScalaDslMongoReadJournal::eventsByPersistenceId$$anonfun$1);
        Source withAttributes = this.impl.currentEventsByPersistenceId(str, j, j2, this.m, this.ec).withAttributes(Attributes$.MODULE$.logLevels(Logging$.MODULE$.InfoLevel(), Logging$.MODULE$.InfoLevel(), Attributes$.MODULE$.logLevels$default$3()).and(Attributes$.MODULE$.name("events-by-pid-current")));
        Source withAttributes2 = this.impl.liveEventsByPersistenceId(str, this.m, this.ec).withAttributes(Attributes$.MODULE$.logLevels(Logging$.MODULE$.InfoLevel(), Logging$.MODULE$.InfoLevel(), Attributes$.MODULE$.logLevels$default$3()).and(Attributes$.MODULE$.name("events-by-pid-realtime")));
        return ScalaDslMongoReadJournal$.MODULE$.RichFlow(withAttributes.concat(withAttributes2).via(Flow$.MODULE$.apply().filter(event -> {
            String pid = event.pid();
            return pid != null ? pid.equals(str) : str == null;
        }).filter(event2 -> {
            return event2.sn() >= j;
        }).via(new StopAtSeq(j2)).via(new RemoveDuplicatedEventsByPersistenceId()))).toEventEnvelopes();
    }

    public Source<String, NotUsed> persistenceIds() {
        return this.impl.currentPersistenceIds(this.m, this.ec).$plus$plus(this.impl.livePersistenceIds(this.m, this.ec)).via(new RemoveDuplicates());
    }

    public Source<EventEnvelope, NotUsed> eventsByTag(String str, Offset offset) {
        Predef$.MODULE$.require(str != null, ScalaDslMongoReadJournal::eventsByTag$$anonfun$1);
        Predef$.MODULE$.require(this.impl.checkOffsetIsSupported(offset), () -> {
            return eventsByTag$$anonfun$2(r2);
        });
        return ScalaDslMongoReadJournal$.MODULE$.RichFlowWithOffsets(this.impl.currentEventsByTag(str, offset, this.m, this.ec)).toEventEnvelopes().$plus$plus(ScalaDslMongoReadJournal$.MODULE$.RichFlowWithOffsets(this.impl.liveEventsByTag(str, offset, this.m, this.ec, package$OffsetOrdering$.MODULE$)).toEventEnvelopes()).via(new RemoveDuplicatedEventEnvelopes());
    }

    private static final Object currentEventsByPersistenceId$$anonfun$1() {
        return "PersistenceId must not be null";
    }

    private static final Object currentEventsByTag$$anonfun$1() {
        return "Tag must not be null";
    }

    private static final Object currentEventsByTag$$anonfun$2(Offset offset) {
        return new StringBuilder(40).append("Offset ").append(offset).append(" is not supported by read journal").toString();
    }

    private static final Object eventsByPersistenceId$$anonfun$1() {
        return "PersistenceId must not be null";
    }

    private static final Object eventsByTag$$anonfun$1() {
        return "Tag must not be null";
    }

    private static final Object eventsByTag$$anonfun$2(Offset offset) {
        return new StringBuilder(40).append("Offset ").append(offset).append(" is not supported by read journal").toString();
    }
}
