package com.daml.platform.store.dao;

import akka.NotUsed;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.daml.ledger.offset.Offset;
import com.daml.logging.LoggingContext;
import com.daml.metrics.Metrics;
import com.daml.metrics.ServicesMetrics;
import com.daml.metrics.Timed$;
import com.daml.metrics.api.MetricsContext$;
import com.daml.platform.store.cache.InMemoryFanoutBuffer;
import com.daml.platform.store.interfaces.TransactionLogUpdate;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$parasitic$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ScalaSignature;

/* compiled from: BufferedStreamsReader.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Mf\u0001\u0002\t\u0012\u0001qA\u0001\u0002\n\u0001\u0003\u0002\u0003\u0006I!\n\u0005\tW\u0001\u0011\t\u0011)A\u0005Y!Q\u00111\u0001\u0001\u0003\u0002\u0003\u0006I!!\u0002\t\u0015\u0005-\u0001A!A!\u0002\u0013\ti\u0001\u0003\u0006\u0002\u0018\u0001\u0011\t\u0011)A\u0005\u00033A!\"!\u000b\u0001\u0005\u0003\u0005\u000b1BA\u0016\u0011\u0019y\u0004\u0001\"\u0001\u00028!I\u0011\u0011\n\u0001C\u0002\u0013%\u00111\n\u0005\t\u0003_\u0002\u0001\u0015!\u0003\u0002N!1A\n\u0001C\u0001\u0003c:aaO\t\t\u0002UadA\u0002\t\u0012\u0011\u0003)b\bC\u0003@\u0019\u0011\u0005\u0001IB\u0004B\u0019A\u0005\u0019\u0013\u0001\"\t\u000b\u0011sa\u0011A#\u0003+\t+hMZ3sK\u0012\u001cFO]3b[N\u0014V-\u00193fe*\u0011!cE\u0001\u0004I\u0006|'B\u0001\u000b\u0016\u0003\u0015\u0019Ho\u001c:f\u0015\t1r#\u0001\u0005qY\u0006$hm\u001c:n\u0015\tA\u0012$\u0001\u0003eC6d'\"\u0001\u000e\u0002\u0007\r|Wn\u0001\u0001\u0016\tui\u0018\u0011A\n\u0003\u0001y\u0001\"a\b\u0012\u000e\u0003\u0001R\u0011!I\u0001\u0006g\u000e\fG.Y\u0005\u0003G\u0001\u0012a!\u00118z%\u00164\u0017\u0001F5o\u001b\u0016lwN]=GC:|W\u000f\u001e\"vM\u001a,'\u000f\u0005\u0002'S5\tqE\u0003\u0002)'\u0005)1-Y2iK&\u0011!f\n\u0002\u0015\u0013:lU-\\8ss\u001a\u000bgn\\;u\u0005V4g-\u001a:\u0002)\u0019,Go\u00195Ge>l\u0007+\u001a:tSN$XM\\2f!\u0011ic\u0002`@\u000f\u00059ZaBA\u0018;\u001d\t\u0001\u0014H\u0004\u00022q9\u0011!g\u000e\b\u0003gYj\u0011\u0001\u000e\u0006\u0003km\ta\u0001\u0010:p_Rt\u0014\"\u0001\u000e\n\u0005aI\u0012B\u0001\f\u0018\u0013\t!R#\u0003\u0002\u0013'\u0005)\")\u001e4gKJ,Gm\u0015;sK\u0006l7OU3bI\u0016\u0014\bCA\u001f\r\u001b\u0005\t2C\u0001\u0007\u001f\u0003\u0019a\u0014N\\5u}Q\tAH\u0001\u000bGKR\u001c\u0007N\u0012:p[B+'o]5ti\u0016t7-Z\u000b\u0004\u0007jt6C\u0001\b\u001f\u0003\u0015\t\u0007\u000f\u001d7z)\u001115/^<\u0015\u0005\u001d[\u0007\u0003\u0002%P#\u001el\u0011!\u0013\u0006\u0003\u0015.\u000b\u0001b]2bY\u0006$7\u000f\u001c\u0006\u0003\u00196\u000baa\u001d;sK\u0006l'\"\u0001(\u0002\t\u0005\\7.Y\u0005\u0003!&\u0013aaU8ve\u000e,\u0007\u0003B\u0010S)rK!a\u0015\u0011\u0003\rQ+\b\u000f\\33!\t)&,D\u0001W\u0015\t9\u0006,\u0001\u0004pM\u001a\u001cX\r\u001e\u0006\u00033^\ta\u0001\\3eO\u0016\u0014\u0018BA.W\u0005\u0019yeMZ:fiB\u0011QL\u0018\u0007\u0001\t\u0015yfB1\u0001a\u00051\t\u0005+S0S\u000bN\u0003vJT*F#\t\tG\r\u0005\u0002 E&\u00111\r\t\u0002\b\u001d>$\b.\u001b8h!\tyR-\u0003\u0002gA\t\u0019\u0011I\\=\u0011\u0005!LW\"A'\n\u0005)l%a\u0002(piV\u001bX\r\u001a\u0005\u0006Y>\u0001\u001d!\\\u0001\u000fY><w-\u001b8h\u0007>tG/\u001a=u!\tq\u0017/D\u0001p\u0015\t\u0001x#A\u0004m_\u001e<\u0017N\\4\n\u0005I|'A\u0004'pO\u001eLgnZ\"p]R,\u0007\u0010\u001e\u0005\u0006i>\u0001\r\u0001V\u0001\u000fgR\f'\u000f^#yG2,8/\u001b<f\u0011\u00151x\u00021\u0001U\u00031)g\u000eZ%oG2,8/\u001b<f\u0011\u0015Ax\u00021\u0001z\u0003\u00191\u0017\u000e\u001c;feB\u0011QL\u001f\u0003\u0006w:\u0011\r\u0001\u0019\u0002\u0007\r&cE+\u0012*\u0011\u0005ukH!\u0002@\u0001\u0005\u0004\u0001'A\u0006)F%NK5\u000bV#O\u0007\u0016{f)\u0012+D\u0011~\u000b%kR*\u0011\u0007u\u000b\t\u0001B\u0003`\u0001\t\u0007\u0001-A\u0015ck\u001a4WM]3e'R\u0014X-Y7Fm\u0016tGo\u001d)s_\u000e,7o]5oOB\u000b'/\u00197mK2L7/\u001c\t\u0004?\u0005\u001d\u0011bAA\u0005A\t\u0019\u0011J\u001c;\u0002\u000f5,GO]5dgB!\u0011qBA\n\u001b\t\t\tBC\u0002\u0002\f]IA!!\u0006\u0002\u0012\t9Q*\u001a;sS\u000e\u001c\u0018AC:ue\u0016\fWNT1nKB!\u00111DA\u0012\u001d\u0011\ti\"a\b\u0011\u0005M\u0002\u0013bAA\u0011A\u00051\u0001K]3eK\u001aLA!!\n\u0002(\t11\u000b\u001e:j]\u001eT1!!\t!\u0003A)\u00070Z2vi&|gnQ8oi\u0016DH\u000f\u0005\u0003\u0002.\u0005MRBAA\u0018\u0015\r\t\t\u0004I\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u001b\u0003_\u0011\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0015\u0019\u0005e\u0012qHA!\u0003\u0007\n)%a\u0012\u0015\t\u0005m\u0012Q\b\t\u0005{\u0001ax\u0010C\u0004\u0002*\u001d\u0001\u001d!a\u000b\t\u000b\u0011:\u0001\u0019A\u0013\t\u000b-:\u0001\u0019\u0001\u0017\t\u000f\u0005\rq\u00011\u0001\u0002\u0006!9\u00111B\u0004A\u0002\u00055\u0001bBA\f\u000f\u0001\u0007\u0011\u0011D\u0001\u0014EV4g-\u001a:SK\u0006$WM]'fiJL7m]\u000b\u0003\u0003\u001b\u0002B!a\u0014\u0002j9!\u0011\u0011KA1\u001d\u0011\t\u0019&a\u0017\u000f\t\u0005U\u0013\u0011\f\b\u0004\u0003/\"Q\"\u0001\u0001\n\u0007a\t\u0019\"\u0003\u0003\u0002^\u0005}\u0013\u0001C:feZL7-Z:\u000b\u0007a\t\u0019\"\u0003\u0003\u0002d\u0005\u0015\u0014!B5oI\u0016D\u0018\u0002BA4\u0003#\u0011qbU3sm&\u001cWm]'fiJL7m]\u0005\u0005\u0003W\niG\u0001\bCk\u001a4WM]3e%\u0016\fG-\u001a:\u000b\t\u0005\r\u0014QM\u0001\u0015EV4g-\u001a:SK\u0006$WM]'fiJL7m\u001d\u0011\u0016\t\u0005M\u00141\u0015\u000b\r\u0003k\ni(a \u0002\u0002\u0006\u0015\u0015q\u0015\u000b\u0005\u0003o\nY\bE\u0003I\u001f\u0006et\r\u0005\u0003 %R{\b\"\u00027\u000b\u0001\bi\u0007\"\u0002;\u000b\u0001\u0004!\u0006\"\u0002<\u000b\u0001\u0004!\u0006BBAB\u0015\u0001\u0007A0\u0001\u000bqKJ\u001c\u0018n\u001d;f]\u000e,g)\u001a;dQ\u0006\u0013xm\u001d\u0005\b\u0003\u000fS\u0001\u0019AAE\u00031\u0011WO\u001a4fe\u001aKG\u000e^3s!\u001dy\u00121RAH\u00037K1!!$!\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0003\u0002\u0012\u0006]UBAAJ\u0015\r\t)jE\u0001\u000bS:$XM\u001d4bG\u0016\u001c\u0018\u0002BAM\u0003'\u0013A\u0003\u0016:b]N\f7\r^5p]2{w-\u00169eCR,\u0007#B\u0010\u0002\u001e\u0006\u0005\u0016bAAPA\t1q\n\u001d;j_:\u00042!XAR\t\u0019\t)K\u0003b\u0001A\nQ!)\u0016$G\u000bJ{v*\u0016+\t\u000f\u0005%&\u00021\u0001\u0002,\u0006iAo\\!qSJ+7\u000f]8og\u0016\u0004raHAF\u0003C\u000bi\u000bE\u0003\u0002.\u0005=v0\u0003\u0003\u00022\u0006=\"A\u0002$viV\u0014X\r")
/* loaded from: input_file:com/daml/platform/store/dao/BufferedStreamsReader.class */
public class BufferedStreamsReader<PERSISTENCE_FETCH_ARGS, API_RESPONSE> {
    private final InMemoryFanoutBuffer inMemoryFanoutBuffer;
    private final FetchFromPersistence<PERSISTENCE_FETCH_ARGS, API_RESPONSE> fetchFromPersistence;
    private final int bufferedStreamEventsProcessingParallelism;
    private final ExecutionContext executionContext;
    private final ServicesMetrics.index.BufferedReader bufferReaderMetrics;

    /* compiled from: BufferedStreamsReader.scala */
    /* loaded from: input_file:com/daml/platform/store/dao/BufferedStreamsReader$FetchFromPersistence.class */
    public interface FetchFromPersistence<FILTER, API_RESPONSE> {
        Source<Tuple2<Offset, API_RESPONSE>, NotUsed> apply(Offset offset, Offset offset2, FILTER filter, LoggingContext loggingContext);
    }

    private ServicesMetrics.index.BufferedReader bufferReaderMetrics() {
        return this.bufferReaderMetrics;
    }

    public <BUFFER_OUT> Source<Tuple2<Offset, API_RESPONSE>, NotUsed> stream(Offset offset, Offset offset2, PERSISTENCE_FETCH_ARGS persistence_fetch_args, Function1<TransactionLogUpdate, Option<BUFFER_OUT>> function1, Function1<BUFFER_OUT, Future<API_RESPONSE>> function12, LoggingContext loggingContext) {
        Source flatMapConcat = Source$.MODULE$.unfoldAsync(offset, offset3 -> {
            return offset3.$less(offset2) ? Future$.MODULE$.apply(() -> {
                InMemoryFanoutBuffer.BufferSlice bufferSlice = (InMemoryFanoutBuffer.BufferSlice) Timed$.MODULE$.value(this.bufferReaderMetrics().slice(), () -> {
                    return this.inMemoryFanoutBuffer.slice(offset3, offset2, function1);
                });
                this.bufferReaderMetrics().sliceSize().update(bufferSlice.slice().size(), MetricsContext$.MODULE$.Empty());
                if (bufferSlice instanceof InMemoryFanoutBuffer.BufferSlice.Inclusive) {
                    Vector slice = ((InMemoryFanoutBuffer.BufferSlice.Inclusive) bufferSlice).slice();
                    return new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Offset) slice.lastOption().map(tuple2 -> {
                        return (Offset) tuple2._1();
                    }).getOrElse(() -> {
                        return offset2;
                    })), this.toApiResponseStream$1(slice, function12)));
                }
                if (!(bufferSlice instanceof InMemoryFanoutBuffer.BufferSlice.LastBufferChunkSuffix)) {
                    throw new MatchError(bufferSlice);
                }
                InMemoryFanoutBuffer.BufferSlice.LastBufferChunkSuffix lastBufferChunkSuffix = (InMemoryFanoutBuffer.BufferSlice.LastBufferChunkSuffix) bufferSlice;
                return new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(offset2), this.fetchFromPersistence.apply(offset3, lastBufferChunkSuffix.bufferedStartExclusive(), persistence_fetch_args, loggingContext).concat(this.toApiResponseStream$1(lastBufferChunkSuffix.slice(), function12))));
            }, this.executionContext) : Future$.MODULE$.successful(None$.MODULE$);
        }).flatMapConcat(source -> {
            return (Source) Predef$.MODULE$.identity(source);
        });
        return Timed$.MODULE$.source(bufferReaderMetrics().fetchTimer(), () -> {
            return flatMapConcat;
        }).map(tuple2 -> {
            this.bufferReaderMetrics().fetchedTotal().inc(this.bufferReaderMetrics().fetchedTotal().inc$default$1());
            return tuple2;
        });
    }

    private final Source toApiResponseStream$1(Vector vector, Function1 function1) {
        return vector.isEmpty() ? Source$.MODULE$.empty() : Source$.MODULE$.apply(vector).mapAsync(this.bufferedStreamEventsProcessingParallelism, tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Offset offset = (Offset) tuple2._1();
            Object _2 = tuple2._2();
            this.bufferReaderMetrics().fetchedBuffered().inc(this.bufferReaderMetrics().fetchedBuffered().inc$default$1());
            return Timed$.MODULE$.future(this.bufferReaderMetrics().conversion(), () -> {
                return ((Future) function1.apply(_2)).map(obj -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(offset), obj);
                }, ExecutionContext$parasitic$.MODULE$);
            });
        });
    }

    public BufferedStreamsReader(InMemoryFanoutBuffer inMemoryFanoutBuffer, FetchFromPersistence<PERSISTENCE_FETCH_ARGS, API_RESPONSE> fetchFromPersistence, int i, Metrics metrics, String str, ExecutionContext executionContext) {
        this.inMemoryFanoutBuffer = inMemoryFanoutBuffer;
        this.fetchFromPersistence = fetchFromPersistence;
        this.bufferedStreamEventsProcessingParallelism = i;
        this.executionContext = executionContext;
        this.bufferReaderMetrics = new ServicesMetrics.index.BufferedReader(metrics.daml().services().index(), str);
    }
}
