package com.daml.platform.store.appendonlydao.events;

import akka.NotUsed;
import akka.stream.BoundedSourceQueue;
import akka.stream.Materializer;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.daml.error.ContextualizedErrorLogger;
import com.daml.logging.ContextualizedLogger;
import com.daml.logging.ContextualizedLogger$;
import com.daml.logging.LoggingContext;
import com.daml.metrics.Metrics;
import com.daml.metrics.Timed$;
import com.daml.platform.store.appendonlydao.events.FilterTableACSReader;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.immutable.Vector;
import scala.concurrent.Future;
import scala.math.Ordering;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ACSReader.scala */
/* loaded from: input_file:com/daml/platform/store/appendonlydao/events/FilterTableACSReader$.class */
public final class FilterTableACSReader$ {
    public static final FilterTableACSReader$ MODULE$ = new FilterTableACSReader$();
    private static final ContextualizedLogger logger = ContextualizedLogger$.MODULE$.get(MODULE$.getClass());

    private ContextualizedLogger logger() {
        return logger;
    }

    public <TASK, RESULT> Source<Tuple2<TASK, RESULT>, NotUsed> pullWorkerSource(int i, Materializer materializer, Function1<TASK, Future<Tuple2<RESULT, Option<TASK>>>> function1, Iterable<TASK> iterable, Ordering<TASK> ordering, ContextualizedErrorLogger contextualizedErrorLogger) {
        if (iterable.isEmpty()) {
            return Source$.MODULE$.empty();
        }
        Tuple2 preMaterialize = Source$.MODULE$.queue(iterable.size()).preMaterialize(materializer);
        if (preMaterialize == null) {
            throw new MatchError(preMaterialize);
        }
        Tuple2 tuple2 = new Tuple2((BoundedSourceQueue) preMaterialize._1(), (Source) preMaterialize._2());
        BoundedSourceQueue boundedSourceQueue = (BoundedSourceQueue) tuple2._1();
        Source source = (Source) tuple2._2();
        FilterTableACSReader.QueueState queueState = new FilterTableACSReader.QueueState(boundedSourceQueue, iterable, ordering, contextualizedErrorLogger);
        return source.mapAsyncUnordered(i, boxedUnit -> {
            Object startTask = queueState.startTask();
            return ((Future) function1.apply(startTask)).map(tuple22 -> {
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                Object _1 = tuple22._1();
                queueState.finishTask((Option) tuple22._2());
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(startTask), _1);
            }, com.daml.dec.package$.MODULE$.DirectExecutionContext());
        });
    }

    public <TASK> Function0<Function1<Tuple2<TASK, Iterable<Object>>, Vector<Vector<Object>>>> mergeIdStreams(Iterable<TASK> iterable, int i, int i2, Metrics metrics, LoggingContext loggingContext) {
        return () -> {
            FilterTableACSReader.BatchedDistinctOutputQueue batchedDistinctOutputQueue = new FilterTableACSReader.BatchedDistinctOutputQueue(i);
            FilterTableACSReader.MergingTaskQueue mergingTaskQueue = new FilterTableACSReader.MergingTaskQueue(j -> {
                batchedDistinctOutputQueue.push(j);
            });
            FilterTableACSReader.TaskTracker taskTracker = new FilterTableACSReader.TaskTracker(iterable, i2);
            return tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Object _1 = tuple2._1();
                Iterable iterable2 = (Iterable) tuple2._2();
                return (Vector) Timed$.MODULE$.value(metrics.daml().index().acsRetrievalSequentialProcessing(), () -> {
                    this.go$1(taskTracker.add(_1, iterable2), mergingTaskQueue, taskTracker, batchedDistinctOutputQueue);
                    Vector<Vector<Object>> flushOutput = batchedDistinctOutputQueue.flushOutput();
                    MODULE$.logger().debug().apply(() -> {
                        return new StringBuilder(65).append("acsRetrievalSequentialProcessing received ").append(_1).append(" with #{").append(iterable2.size()).append("} ").append(iterable2.lastOption().map(obj -> {
                            return $anonfun$mergeIdStreams$7(BoxesRunTime.unboxToLong(obj));
                        }).getOrElse(() -> {
                            return "";
                        })).append("and produced ").append(flushOutput.size()).toString();
                    }, loggingContext);
                    return flushOutput;
                });
            };
        };
    }

    private final void go$1(Tuple2 tuple2, FilterTableACSReader.MergingTaskQueue mergingTaskQueue, FilterTableACSReader.TaskTracker taskTracker, FilterTableACSReader.BatchedDistinctOutputQueue batchedDistinctOutputQueue) {
        while (true) {
            ((Option) tuple2._1()).foreach(tuple22 -> {
                mergingTaskQueue.push(tuple22);
                return BoxedUnit.UNIT;
            });
            if (!tuple2._2$mcZ$sp()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
            Some runUntilATaskEmpty = mergingTaskQueue.runUntilATaskEmpty();
            if (!(runUntilATaskEmpty instanceof Some)) {
                if (!None$.MODULE$.equals(runUntilATaskEmpty)) {
                    throw new MatchError(runUntilATaskEmpty);
                }
                batchedDistinctOutputQueue.flushPartialBatch();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
            tuple2 = taskTracker.finished(runUntilATaskEmpty.value());
        }
    }

    public static final /* synthetic */ String $anonfun$mergeIdStreams$7(long j) {
        return new StringBuilder(7).append("until ").append(j).append(" ").toString();
    }

    private FilterTableACSReader$() {
    }
}
