package com.daml.platform.indexer.parallel;

import akka.NotUsed;
import akka.stream.scaladsl.Flow;
import com.daml.ledger.offset.Offset;
import com.daml.ledger.offset.Offset$;
import com.daml.ledger.participant.state.v2.Update;
import com.daml.logging.ContextualizedLogger;
import com.daml.logging.ContextualizedLogger$;
import com.daml.logging.LoggingContext;
import com.daml.logging.LoggingContext$;
import com.daml.logging.entries.LoggingValue$;
import com.daml.logging.entries.ToLoggingValue$;
import com.daml.metrics.Metrics;
import com.daml.metrics.Timed$;
import com.daml.metrics.api.MetricsContext$;
import com.daml.platform.indexer.parallel.ParallelIndexerSubscription;
import com.daml.platform.store.backend.DbDto;
import com.daml.platform.store.backend.DbDto$StringInterningDto$;
import com.daml.platform.store.backend.IngestionStorageBackend;
import com.daml.platform.store.backend.ParameterStorageBackend;
import com.daml.platform.store.dao.DbDispatcher;
import com.daml.platform.store.dao.events.CompressionStrategy;
import com.daml.platform.store.dao.events.LfValueTranslation;
import com.daml.platform.store.interning.StringInterning;
import java.io.Serializable;
import java.sql.Connection;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple15;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOps;
import scala.collection.Iterator;
import scala.collection.StrictOptimizedIterableOps;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Vector;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ModuleSerializationProxy;
import scala.util.ChainingOps$;
import scala.util.package$chaining$;

/* compiled from: ParallelIndexerSubscription.scala */
/* loaded from: input_file:com/daml/platform/indexer/parallel/ParallelIndexerSubscription$.class */
public final class ParallelIndexerSubscription$ implements Serializable {
    public static final ParallelIndexerSubscription$ MODULE$ = new ParallelIndexerSubscription$();
    private static final ContextualizedLogger logger = ContextualizedLogger$.MODULE$.get(MODULE$.getClass());

    private ContextualizedLogger logger() {
        return logger;
    }

    public Function1<Iterable<Tuple2<Offset, Update>>, ParallelIndexerSubscription.Batch<Vector<DbDto>>> inputMapper(Metrics metrics, Function1<Offset, Function1<Update, Iterator<DbDto>>> function1, Function1<Iterable<Tuple2<Offset, Update>>, Vector<DbDto.TransactionMetering>> function12, LoggingContext loggingContext) {
        return iterable -> {
            metrics.daml().parallelIndexer().inputMapping().batchSize().update(iterable.size(), MetricsContext$.MODULE$.Empty());
            iterable.foreach(tuple2 -> {
                $anonfun$inputMapper$2(loggingContext, tuple2);
                return BoxedUnit.UNIT;
            });
            return new ParallelIndexerSubscription.Batch((Offset) ((Tuple2) iterable.last())._1(), 0L, 0, ((Update) ((Tuple2) iterable.last())._2()).recordTime().toInstant().toEpochMilli(), (Vector) iterable.iterator().flatMap(tuple22 -> {
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                Offset offset = (Offset) tuple22._1();
                return (Iterator) ((Function1) function1.apply(offset)).apply((Update) tuple22._2());
            }).toVector().$plus$plus((Vector) function12.apply(iterable)), iterable.size(), iterable.toVector());
        };
    }

    public ParallelIndexerSubscription.Batch<Vector<DbDto>> seqMapperZero(long j, int i) {
        return new ParallelIndexerSubscription.Batch<>(null, j, i, 0L, package$.MODULE$.Vector().empty(), 0, package$.MODULE$.Vector().empty());
    }

    public ParallelIndexerSubscription.Batch<Vector<DbDto>> seqMapper(Function1<Iterable<DbDto>, Iterable<Tuple2<Object, String>>> function1, Metrics metrics, ParallelIndexerSubscription.Batch<Vector<DbDto>> batch, ParallelIndexerSubscription.Batch<Vector<DbDto>> batch2) {
        return (ParallelIndexerSubscription.Batch) Timed$.MODULE$.value(metrics.daml().parallelIndexer().seqMapping().duration(), () -> {
            LongRef create = LongRef.create(batch.lastSeqEventId());
            LongRef create2 = LongRef.create(create.elem);
            Vector vector = (Vector) ((StrictOptimizedIterableOps) batch2.batch()).map(dbDto -> {
                if (dbDto instanceof DbDto.EventCreate) {
                    DbDto.EventCreate eventCreate = (DbDto.EventCreate) dbDto;
                    create.elem++;
                    return eventCreate.copy(eventCreate.copy$default$1(), eventCreate.copy$default$2(), eventCreate.copy$default$3(), eventCreate.copy$default$4(), eventCreate.copy$default$5(), eventCreate.copy$default$6(), eventCreate.copy$default$7(), eventCreate.copy$default$8(), eventCreate.copy$default$9(), eventCreate.copy$default$10(), eventCreate.copy$default$11(), eventCreate.copy$default$12(), eventCreate.copy$default$13(), eventCreate.copy$default$14(), eventCreate.copy$default$15(), eventCreate.copy$default$16(), eventCreate.copy$default$17(), eventCreate.copy$default$18(), eventCreate.copy$default$19(), eventCreate.copy$default$20(), eventCreate.copy$default$21(), create.elem, eventCreate.copy$default$23());
                }
                if (dbDto instanceof DbDto.EventExercise) {
                    DbDto.EventExercise eventExercise = (DbDto.EventExercise) dbDto;
                    create.elem++;
                    return eventExercise.copy(eventExercise.copy$default$1(), eventExercise.copy$default$2(), eventExercise.copy$default$3(), eventExercise.copy$default$4(), eventExercise.copy$default$5(), eventExercise.copy$default$6(), eventExercise.copy$default$7(), eventExercise.copy$default$8(), eventExercise.copy$default$9(), eventExercise.copy$default$10(), eventExercise.copy$default$11(), eventExercise.copy$default$12(), eventExercise.copy$default$13(), eventExercise.copy$default$14(), eventExercise.copy$default$15(), eventExercise.copy$default$16(), eventExercise.copy$default$17(), eventExercise.copy$default$18(), eventExercise.copy$default$19(), eventExercise.copy$default$20(), eventExercise.copy$default$21(), eventExercise.copy$default$22(), eventExercise.copy$default$23(), create.elem);
                }
                if (dbDto instanceof DbDto.EventDivulgence) {
                    DbDto.EventDivulgence eventDivulgence = (DbDto.EventDivulgence) dbDto;
                    create.elem++;
                    return eventDivulgence.copy(eventDivulgence.copy$default$1(), eventDivulgence.copy$default$2(), eventDivulgence.copy$default$3(), eventDivulgence.copy$default$4(), eventDivulgence.copy$default$5(), eventDivulgence.copy$default$6(), eventDivulgence.copy$default$7(), eventDivulgence.copy$default$8(), eventDivulgence.copy$default$9(), eventDivulgence.copy$default$10(), create.elem);
                }
                if (dbDto instanceof DbDto.IdFilterCreateStakeholder) {
                    DbDto.IdFilterCreateStakeholder idFilterCreateStakeholder = (DbDto.IdFilterCreateStakeholder) dbDto;
                    return idFilterCreateStakeholder.copy(create.elem, idFilterCreateStakeholder.copy$default$2(), idFilterCreateStakeholder.copy$default$3());
                }
                if (dbDto instanceof DbDto.IdFilterCreateNonStakeholderInformee) {
                    DbDto.IdFilterCreateNonStakeholderInformee idFilterCreateNonStakeholderInformee = (DbDto.IdFilterCreateNonStakeholderInformee) dbDto;
                    return idFilterCreateNonStakeholderInformee.copy(create.elem, idFilterCreateNonStakeholderInformee.copy$default$2());
                }
                if (dbDto instanceof DbDto.IdFilterConsumingStakeholder) {
                    DbDto.IdFilterConsumingStakeholder idFilterConsumingStakeholder = (DbDto.IdFilterConsumingStakeholder) dbDto;
                    return idFilterConsumingStakeholder.copy(create.elem, idFilterConsumingStakeholder.copy$default$2(), idFilterConsumingStakeholder.copy$default$3());
                }
                if (dbDto instanceof DbDto.IdFilterConsumingNonStakeholderInformee) {
                    DbDto.IdFilterConsumingNonStakeholderInformee idFilterConsumingNonStakeholderInformee = (DbDto.IdFilterConsumingNonStakeholderInformee) dbDto;
                    return idFilterConsumingNonStakeholderInformee.copy(create.elem, idFilterConsumingNonStakeholderInformee.copy$default$2());
                }
                if (dbDto instanceof DbDto.IdFilterNonConsumingInformee) {
                    DbDto.IdFilterNonConsumingInformee idFilterNonConsumingInformee = (DbDto.IdFilterNonConsumingInformee) dbDto;
                    return idFilterNonConsumingInformee.copy(create.elem, idFilterNonConsumingInformee.copy$default$2());
                }
                if (!(dbDto instanceof DbDto.TransactionMeta)) {
                    return dbDto;
                }
                DbDto.TransactionMeta transactionMeta = (DbDto.TransactionMeta) dbDto;
                return (DbDto) ChainingOps$.MODULE$.tap$extension(package$chaining$.MODULE$.scalaUtilChainingOps(transactionMeta.copy(transactionMeta.copy$default$1(), transactionMeta.copy$default$2(), create2.elem + 1, create.elem)), transactionMeta2 -> {
                    $anonfun$seqMapper$3(create2, create, transactionMeta2);
                    return BoxedUnit.UNIT;
                });
            });
            Iterable iterable = (Iterable) ((IterableOps) function1.apply(vector)).map(tuple2 -> {
                return DbDto$StringInterningDto$.MODULE$.from(tuple2);
            });
            Tuple2 $minus$greater$extension = iterable.isEmpty() ? Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(batch.lastStringInterningId())), vector) : Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(((DbDto.StringInterningDto) iterable.last()).internalId())), vector.$plus$plus(iterable));
            if ($minus$greater$extension == null) {
                throw new MatchError($minus$greater$extension);
            }
            Tuple2 tuple22 = new Tuple2(BoxesRunTime.boxToInteger($minus$greater$extension._1$mcI$sp()), (Vector) $minus$greater$extension._2());
            return batch2.copy(batch2.copy$default$1(), create.elem, tuple22._1$mcI$sp(), batch2.copy$default$4(), (Vector) tuple22._2(), batch2.copy$default$6(), batch2.copy$default$7());
        });
    }

    public <DB_BATCH> Function1<ParallelIndexerSubscription.Batch<Vector<DbDto>>, ParallelIndexerSubscription.Batch<DB_BATCH>> batcher(Function1<Vector<DbDto>, DB_BATCH> function1) {
        return batch -> {
            return batch.copy(batch.copy$default$1(), batch.copy$default$2(), batch.copy$default$3(), batch.copy$default$4(), function1.apply(batch.batch()), batch.copy$default$6(), batch.copy$default$7());
        };
    }

    public <DB_BATCH> Function1<ParallelIndexerSubscription.Batch<DB_BATCH>, Future<ParallelIndexerSubscription.Batch<DB_BATCH>>> ingester(Function2<Connection, DB_BATCH, BoxedUnit> function2, DB_BATCH db_batch, DbDispatcher dbDispatcher, Metrics metrics, LoggingContext loggingContext) {
        return batch -> {
            return (Future) LoggingContext$.MODULE$.withEnrichedLoggingContext(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("updateOffsets"), LoggingValue$.MODULE$.from(batch.offsetsUpdates().map(tuple2 -> {
                return (Offset) tuple2._1();
            }), ToLoggingValue$.MODULE$.Iterable$u005BT$u005D$u0020to$u0020LoggingValue(Offset$.MODULE$.Offset$u0020to$u0020LoggingValue()))), Nil$.MODULE$, loggingContext2 -> {
                return dbDispatcher.executeSql(metrics.daml().parallelIndexer().ingestion(), connection -> {
                    metrics.daml().parallelIndexer().updates().inc(batch.batchSize(), MetricsContext$.MODULE$.Empty());
                    function2.apply(connection, batch.batch());
                    return (ParallelIndexerSubscription.Batch) MODULE$.cleanUnusedBatch(db_batch).apply(batch);
                }, loggingContext2);
            }, loggingContext);
        };
    }

    public ParameterStorageBackend.LedgerEnd ledgerEndFrom(ParallelIndexerSubscription.Batch<?> batch) {
        return new ParameterStorageBackend.LedgerEnd(batch.lastOffset(), batch.lastSeqEventId(), batch.lastStringInterningId());
    }

    public <DB_BATCH> Function1<Vector<ParallelIndexerSubscription.Batch<DB_BATCH>>, Future<Vector<ParallelIndexerSubscription.Batch<DB_BATCH>>>> ingestTail(Function1<ParameterStorageBackend.LedgerEnd, Function1<Connection, BoxedUnit>> function1, DbDispatcher dbDispatcher, Metrics metrics, LoggingContext loggingContext) {
        return vector -> {
            Some lastOption = vector.lastOption();
            if (lastOption instanceof Some) {
                ParallelIndexerSubscription.Batch batch = (ParallelIndexerSubscription.Batch) lastOption.value();
                return (Future) LoggingContext$.MODULE$.withEnrichedLoggingContext(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("updateOffset"), LoggingValue$.MODULE$.from(batch.lastOffset(), Offset$.MODULE$.Offset$u0020to$u0020LoggingValue())), Nil$.MODULE$, loggingContext2 -> {
                    return dbDispatcher.executeSql(metrics.daml().parallelIndexer().tailIngestion(), connection -> {
                        ((Function1) function1.apply(MODULE$.ledgerEndFrom(batch))).apply(connection);
                        metrics.daml().indexer().ledgerEndSequentialId().updateValue(BoxesRunTime.boxToLong(batch.lastSeqEventId()));
                        metrics.daml().indexer().lastReceivedRecordTime().updateValue(BoxesRunTime.boxToLong(batch.lastRecordTime()));
                        MODULE$.logger().info().apply(() -> {
                            return "Ledger end updated in IndexDB";
                        }, loggingContext2);
                        return vector;
                    }, loggingContext2);
                }, loggingContext);
            }
            if (!None$.MODULE$.equals(lastOption)) {
                throw new MatchError(lastOption);
            }
            String str = "Unexpectedly encountered a zero-sized batch in ingestTail";
            MODULE$.logger().error().apply(() -> {
                return str;
            }, loggingContext);
            return Future$.MODULE$.failed(new IllegalStateException("Unexpectedly encountered a zero-sized batch in ingestTail"));
        };
    }

    private <DB_BATCH> Function1<ParallelIndexerSubscription.Batch<DB_BATCH>, ParallelIndexerSubscription.Batch<DB_BATCH>> cleanUnusedBatch(DB_BATCH db_batch) {
        return batch -> {
            return batch.copy(batch.copy$default$1(), batch.copy$default$2(), batch.copy$default$3(), batch.copy$default$4(), db_batch, 0, batch.copy$default$7());
        };
    }

    public <DB_BATCH> ParallelIndexerSubscription<DB_BATCH> apply(IngestionStorageBackend<DB_BATCH> ingestionStorageBackend, ParameterStorageBackend parameterStorageBackend, String str, LfValueTranslation lfValueTranslation, CompressionStrategy compressionStrategy, int i, int i2, int i3, int i4, long j, int i5, int i6, Metrics metrics, Flow<Tuple2<Vector<Tuple2<Offset, Update>>, Object>, BoxedUnit, NotUsed> flow, StringInterning stringInterning) {
        return new ParallelIndexerSubscription<>(ingestionStorageBackend, parameterStorageBackend, str, lfValueTranslation, compressionStrategy, i, i2, i3, i4, j, i5, i6, metrics, flow, stringInterning);
    }

    public <DB_BATCH> Option<Tuple15<IngestionStorageBackend<DB_BATCH>, ParameterStorageBackend, String, LfValueTranslation, CompressionStrategy, Object, Object, Object, Object, Object, Object, Object, Metrics, Flow<Tuple2<Vector<Tuple2<Offset, Update>>, Object>, BoxedUnit, NotUsed>, StringInterning>> unapply(ParallelIndexerSubscription<DB_BATCH> parallelIndexerSubscription) {
        return parallelIndexerSubscription == null ? None$.MODULE$ : new Some(new Tuple15(parallelIndexerSubscription.ingestionStorageBackend(), parallelIndexerSubscription.parameterStorageBackend(), parallelIndexerSubscription.participantId(), parallelIndexerSubscription.translation(), parallelIndexerSubscription.compressionStrategy(), BoxesRunTime.boxToInteger(parallelIndexerSubscription.maxInputBufferSize()), BoxesRunTime.boxToInteger(parallelIndexerSubscription.inputMappingParallelism()), BoxesRunTime.boxToInteger(parallelIndexerSubscription.batchingParallelism()), BoxesRunTime.boxToInteger(parallelIndexerSubscription.ingestionParallelism()), BoxesRunTime.boxToLong(parallelIndexerSubscription.submissionBatchSize()), BoxesRunTime.boxToInteger(parallelIndexerSubscription.maxOutputBatchedBufferSize()), BoxesRunTime.boxToInteger(parallelIndexerSubscription.maxTailerBatchSize()), parallelIndexerSubscription.metrics(), parallelIndexerSubscription.inMemoryStateUpdaterFlow(), parallelIndexerSubscription.stringInterningView()));
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(ParallelIndexerSubscription$.class);
    }

    public static final /* synthetic */ void $anonfun$inputMapper$3(Update update, LoggingContext loggingContext) {
        MODULE$.logger().info().apply(() -> {
            return new StringBuilder(8).append("Storing ").append(update.description()).toString();
        }, loggingContext);
    }

    public static final /* synthetic */ void $anonfun$inputMapper$2(LoggingContext loggingContext, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Offset offset = (Offset) tuple2._1();
        Update update = (Update) tuple2._2();
    }

    public static final /* synthetic */ void $anonfun$seqMapper$3(LongRef longRef, LongRef longRef2, DbDto.TransactionMeta transactionMeta) {
        longRef.elem = longRef2.elem;
    }

    private ParallelIndexerSubscription$() {
    }
}
