package com.daml.projection.scaladsl;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.grpc.GrpcClientSettings;
import akka.grpc.scaladsl.AkkaGrpcClient;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.daml.ledger.api.v1.active_contracts_service.ActiveContractsServiceClient;
import com.daml.ledger.api.v1.active_contracts_service.ActiveContractsServiceClient$;
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsRequest;
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsRequest$;
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse;
import com.daml.ledger.api.v1.event.Event;
import com.daml.ledger.api.v1.event.ExercisedEvent;
import com.daml.ledger.api.v1.transaction.TreeEvent;
import com.daml.ledger.api.v1.transaction_filter.TransactionFilter;
import com.daml.ledger.api.v1.transaction_service.GetTransactionTreesResponse;
import com.daml.ledger.api.v1.transaction_service.GetTransactionsRequest;
import com.daml.ledger.api.v1.transaction_service.GetTransactionsRequest$;
import com.daml.ledger.api.v1.transaction_service.GetTransactionsResponse;
import com.daml.ledger.api.v1.transaction_service.TransactionServiceClient;
import com.daml.ledger.api.v1.transaction_service.TransactionServiceClient$;
import com.daml.projection.Batch;
import com.daml.projection.Batcher;
import com.daml.projection.ConsumerRecord;
import com.daml.projection.Envelope;
import com.daml.projection.Offset;
import com.daml.projection.Offset$;
import com.daml.projection.Projection;
import com.daml.projection.ProjectionId;
import com.daml.projection.TxBoundary;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Product;
import scala.Some;
import scala.collection.IterableOps;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.FiniteDuration$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: Consumer.scala */
/* loaded from: input_file:com/daml/projection/scaladsl/Consumer$.class */
public final class Consumer$ implements StrictLogging {
    public static final Consumer$ MODULE$ = new Consumer$();
    private static Logger logger;

    static {
        StrictLogging.$init$(MODULE$);
    }

    public Logger logger() {
        return logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger2) {
        logger = logger2;
    }

    public Source<Batch<Event>, Control> eventSource(GrpcClientSettings grpcClientSettings, Projection<Event> projection, ActorSystem actorSystem) {
        ExecutionContextExecutor dispatcher = actorSystem.dispatcher();
        Function1<Envelope<Event>, Object> predicate = projection.predicate();
        ActiveContractsServiceClient apply = ActiveContractsServiceClient$.MODULE$.apply(grpcClientSettings, (ClassicActorSystemProvider) actorSystem);
        TransactionServiceClient apply2 = TransactionServiceClient$.MODULE$.apply(grpcClientSettings, (ClassicActorSystemProvider) actorSystem);
        if (projection.offset().isEmpty()) {
            if (logger().underlying().isTraceEnabled()) {
                logger().underlying().trace("Getting the ACS for projection: '{}'", projection.id());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            return activeContractSourceFromTxs(apply.getActiveContracts(new GetActiveContractsRequest(GetActiveContractsRequest$.MODULE$.apply$default$1(), new Some(projection.transactionFilter()), GetActiveContractsRequest$.MODULE$.apply$default$3())), (option, transactionFilter) -> {
                return MODULE$.getTransactions(apply2, projection.endOffset(), option, transactionFilter);
            }, projection, predicate, getBatchSize(projection, actorSystem), getDefaultBatcherInterval(actorSystem)).viaMat(completeActiveContractSource$1(projection, apply, apply2, dispatcher), Keep$.MODULE$.right());
        }
        if (logger().underlying().isTraceEnabled()) {
            logger().underlying().trace("Getting transactions for projection: '{}' from offset '{}'.", new Object[]{projection.id(), projection.offset()});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        return getTransactions(apply2, projection.endOffset(), projection.offset(), projection.transactionFilter()).via(toConsumerRecord(projection.id(), predicate)).via(new Batcher(getBatchSize(projection, actorSystem), getDefaultBatcherInterval(actorSystem))).viaMat(completeActiveContractSource$1(projection, apply, apply2, dispatcher), Keep$.MODULE$.right());
    }

    public int getBatchSize(Projection<?> projection, ActorSystem actorSystem) {
        return BoxesRunTime.unboxToInt(projection.batchSize().getOrElse(() -> {
            return actorSystem.settings().config().getInt("projection.batch-size");
        }));
    }

    public FiniteDuration getDefaultBatcherInterval(ActorSystem actorSystem) {
        return FiniteDuration$.MODULE$.apply(actorSystem.settings().config().getDuration("projection.batch-interval").toNanos(), TimeUnit.NANOSECONDS);
    }

    public Source<Batch<ExercisedEvent>, Control> exercisedEventSource(GrpcClientSettings grpcClientSettings, Projection<ExercisedEvent> projection, ActorSystem actorSystem) {
        ExecutionContextExecutor dispatcher = actorSystem.dispatcher();
        Function1<Envelope<ExercisedEvent>, Object> predicate = projection.predicate();
        TransactionServiceClient apply = TransactionServiceClient$.MODULE$.apply(grpcClientSettings, (ClassicActorSystemProvider) actorSystem);
        return exercisedEventSourceFromTrees(getTransactionTrees(apply, projection.endOffset(), projection.offset(), projection.transactionFilter()), projection, predicate, getBatchSize(projection, actorSystem), getDefaultBatcherInterval(actorSystem)).viaMat(completeTreeEventSource(projection, apply, killSwitch -> {
            return createControl$2(killSwitch, apply, dispatcher);
        }, dispatcher), Keep$.MODULE$.right());
    }

    public Source<Batch<TreeEvent>, Control> treeEventSource(GrpcClientSettings grpcClientSettings, Projection<TreeEvent> projection, ActorSystem actorSystem) {
        ExecutionContextExecutor dispatcher = actorSystem.dispatcher();
        Function1<Envelope<TreeEvent>, Object> predicate = projection.predicate();
        TransactionServiceClient apply = TransactionServiceClient$.MODULE$.apply(grpcClientSettings, (ClassicActorSystemProvider) actorSystem);
        return treeEventSourceFromTrees(getTransactionTrees(apply, projection.endOffset(), projection.offset(), projection.transactionFilter()), projection.id(), predicate, getBatchSize(projection, actorSystem), getDefaultBatcherInterval(actorSystem)).viaMat(completeTreeEventSource(projection, apply, killSwitch -> {
            return createControl$3(killSwitch, apply, dispatcher);
        }, dispatcher), Keep$.MODULE$.right());
    }

    private <T> Flow<Batch<T>, Batch<T>, Control> completeTreeEventSource(Projection<T> projection, TransactionServiceClient transactionServiceClient, Function1<KillSwitch, Control> function1, ExecutionContext executionContext) {
        return completeWithControl(projection, () -> {
            return transactionServiceClient.close().flatMap(done -> {
                return transactionServiceClient.closed();
            }, executionContext);
        }, function1);
    }

    private <T> Flow<Batch<T>, Batch<T>, Control> completeWithControl(Projection<T> projection, Function0<Future<Done>> function0, Function1<KillSwitch, Control> function1) {
        AtomicReference atomicReference = new AtomicReference();
        return Flow$.MODULE$.apply().viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.right()).mapMaterializedValue(uniqueKillSwitch -> {
            Control control = (Control) function1.apply(uniqueKillSwitch);
            atomicReference.set(control);
            return control;
        }).alsoTo(Flow$.MODULE$.apply().fold(BoxedUnit.UNIT, Keep$.MODULE$.left()).recover(new Consumer$$anonfun$completeWithControl$2(projection, atomicReference)).mapAsync(1, boxedUnit -> {
            if (MODULE$.logger().underlying().isTraceEnabled()) {
                MODULE$.logger().underlying().trace("Closing resources for projection {}", projection.id());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            ((Control) atomicReference.get()).tryComplete();
            return (Future) function0.apply();
        }).to(Sink$.MODULE$.ignore()));
    }

    private Flow<GetTransactionsResponse, Product, NotUsed> toConsumerRecord(ProjectionId projectionId, Function1<Envelope<Event>, Object> function1) {
        return Flow$.MODULE$.apply().mapConcat(getTransactionsResponse -> {
            return (Seq) getTransactionsResponse.transactions().flatMap(transaction -> {
                String transactionId = transaction.transactionId();
                Option map = transaction.effectiveAt().map(timestamp -> {
                    return Instant.ofEpochSecond(timestamp.seconds(), timestamp.nanos());
                });
                String workflowId = transaction.workflowId();
                Offset offset = new Offset(transaction.offset());
                Seq seq = (Seq) ((IterableOps) transaction.events().map(event -> {
                    return new Envelope(event, new Some(workflowId), map, new Some(transactionId), new Some(offset));
                })).filter(function1);
                return seq.nonEmpty() ? (Seq) seq.$colon$plus(new TxBoundary(projectionId, offset)) : seq;
            });
        });
    }

    private Source<GetTransactionTreesResponse, NotUsed> getTransactionTrees(TransactionServiceClient transactionServiceClient, Option<Offset> option, Option<Offset> option2, TransactionFilter transactionFilter) {
        return transactionServiceClient.getTransactionTrees(new GetTransactionsRequest(GetTransactionsRequest$.MODULE$.apply$default$1(), new Some(Offset$.MODULE$.protoOffset(option2)), option.map(offset -> {
            return offset.toProto();
        }), new Some(transactionFilter), GetTransactionsRequest$.MODULE$.apply$default$5()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Source<GetTransactionsResponse, NotUsed> getTransactions(TransactionServiceClient transactionServiceClient, Option<Offset> option, Option<Offset> option2, TransactionFilter transactionFilter) {
        return transactionServiceClient.getTransactions(new GetTransactionsRequest(GetTransactionsRequest$.MODULE$.apply$default$1(), new Some(Offset$.MODULE$.protoOffset(option2)), option.map(offset -> {
            return offset.toProto();
        }), new Some(transactionFilter), GetTransactionsRequest$.MODULE$.apply$default$5()));
    }

    public Source<ConsumerRecord<Event>, NotUsed> activeContractConsumerRecordSource(Source<GetActiveContractsResponse, NotUsed> source, Function2<Option<Offset>, TransactionFilter, Source<GetTransactionsResponse, NotUsed>> function2, Projection<Event> projection, Function1<Envelope<Event>, Object> function1) {
        return source.flatMapConcat(getActiveContractsResponse -> {
            String offset = getActiveContractsResponse.offset();
            if (offset != null ? offset.equals("") : "" == 0) {
                if (MODULE$.logger().underlying().isTraceEnabled()) {
                    MODULE$.logger().underlying().trace("Received ACS response: {}", getActiveContractsResponse);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                return Source$.MODULE$.apply((Iterable) getActiveContractsResponse.activeContracts().map(createdEvent -> {
                    return new Envelope(new Event(new Event.InterfaceC0001Event.Created(createdEvent)), None$.MODULE$, None$.MODULE$, None$.MODULE$, projection.offset());
                })).filter(function1);
            }
            if (MODULE$.logger().underlying().isTraceEnabled()) {
                MODULE$.logger().underlying().trace("Received Offset {} as last response of the ACS.", getActiveContractsResponse.offset());
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            Offset offset2 = new Offset(getActiveContractsResponse.offset());
            return Source$.MODULE$.apply((Iterable) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TxBoundary[]{new TxBoundary(projection.id(), offset2)}))).concat(((Source) function2.apply(new Some(offset2), projection.transactionFilter())).via(MODULE$.toConsumerRecord(projection.id(), function1)));
        });
    }

    public Source<Batch<Event>, NotUsed> activeContractSourceFromTxs(Source<GetActiveContractsResponse, NotUsed> source, Function2<Option<Offset>, TransactionFilter, Source<GetTransactionsResponse, NotUsed>> function2, Projection<Event> projection, Function1<Envelope<Event>, Object> function1, int i, FiniteDuration finiteDuration) {
        return activeContractConsumerRecordSource(source, function2, projection, function1).via(new Batcher(i, finiteDuration));
    }

    public Source<Batch<ExercisedEvent>, NotUsed> exercisedEventSourceFromTrees(Source<GetTransactionTreesResponse, NotUsed> source, Projection<ExercisedEvent> projection, Function1<Envelope<ExercisedEvent>, Object> function1, int i, FiniteDuration finiteDuration) {
        return treeEventSourceFromTrees(source, projection.id(), envelope -> {
            return BoxesRunTime.boxToBoolean($anonfun$exercisedEventSourceFromTrees$1(function1, envelope));
        }, i, finiteDuration).map(batch -> {
            if (batch != null) {
                return new Batch((Vector) batch.envelopes().flatMap(envelope2 -> {
                    return envelope2.traverseOption(new Consumer$$anonfun$$nestedInanonfun$exercisedEventSourceFromTrees$4$1());
                }), batch.boundary().map(txBoundary -> {
                    return txBoundary.coerce();
                }), batch.boundaryIndex());
            }
            throw new MatchError(batch);
        });
    }

    public Source<Batch<TreeEvent>, NotUsed> treeEventSourceFromTrees(Source<GetTransactionTreesResponse, NotUsed> source, ProjectionId projectionId, Function1<Envelope<TreeEvent>, Object> function1, int i, FiniteDuration finiteDuration) {
        return treeEventConsumerRecordSource(source, projectionId, function1).via(new Batcher(i, finiteDuration));
    }

    public Source<ConsumerRecord<TreeEvent>, NotUsed> treeEventConsumerRecordSource(Source<GetTransactionTreesResponse, NotUsed> source, ProjectionId projectionId, Function1<Envelope<TreeEvent>, Object> function1) {
        return source.mapConcat(getTransactionTreesResponse -> {
            return (Seq) getTransactionTreesResponse.transactions().flatMap(transactionTree -> {
                String transactionId = transactionTree.transactionId();
                Option map = transactionTree.effectiveAt().map(timestamp -> {
                    return Instant.ofEpochSecond(timestamp.seconds(), timestamp.nanos());
                });
                String workflowId = transactionTree.workflowId();
                Offset offset = new Offset(transactionTree.offset());
                Seq seq = (Seq) transactionTree.rootEventIds().flatMap(str -> {
                    Envelope envelope = new Envelope((TreeEvent) transactionTree.eventsById().apply(str), new Some(workflowId), map, new Some(transactionId), new Some(offset));
                    return BoxesRunTime.unboxToBoolean(function1.apply(envelope)) ? new Some(envelope) : None$.MODULE$;
                });
                return seq.nonEmpty() ? (Seq) seq.$colon$plus(new TxBoundary(projectionId, offset)) : seq;
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final GrpcControl createControl$1(KillSwitch killSwitch, ActiveContractsServiceClient activeContractsServiceClient, TransactionServiceClient transactionServiceClient, ExecutionContextExecutor executionContextExecutor) {
        return new GrpcControl((Seq) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AkkaGrpcClient[]{activeContractsServiceClient, transactionServiceClient})), killSwitch, executionContextExecutor);
    }

    private final Flow completeActiveContractSource$1(Projection projection, ActiveContractsServiceClient activeContractsServiceClient, TransactionServiceClient transactionServiceClient, ExecutionContextExecutor executionContextExecutor) {
        return Flow$.MODULE$.apply().viaMat(completeWithControl(projection, () -> {
            return GrpcControl$.MODULE$.closeClients((Seq) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new AkkaGrpcClient[]{activeContractsServiceClient, transactionServiceClient})), executionContextExecutor);
        }, killSwitch -> {
            return createControl$1(killSwitch, activeContractsServiceClient, transactionServiceClient, executionContextExecutor);
        }), Keep$.MODULE$.right());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final GrpcControl createControl$2(KillSwitch killSwitch, TransactionServiceClient transactionServiceClient, ExecutionContextExecutor executionContextExecutor) {
        return new GrpcControl((Seq) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TransactionServiceClient[]{transactionServiceClient})), killSwitch, executionContextExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final GrpcControl createControl$3(KillSwitch killSwitch, TransactionServiceClient transactionServiceClient, ExecutionContextExecutor executionContextExecutor) {
        return new GrpcControl((Seq) package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TransactionServiceClient[]{transactionServiceClient})), killSwitch, executionContextExecutor);
    }

    public static final /* synthetic */ boolean $anonfun$exercisedEventSourceFromTrees$1(Function1 function1, Envelope envelope) {
        boolean z;
        TreeEvent.Kind kind = ((TreeEvent) envelope.event()).kind();
        if (kind instanceof TreeEvent.Kind.Exercised) {
            ExercisedEvent m6829value = ((TreeEvent.Kind.Exercised) kind).m6829value();
            z = BoxesRunTime.unboxToBoolean(function1.apply(envelope.map(treeEvent -> {
                return m6829value;
            })));
        } else {
            z = false;
        }
        return z;
    }

    private Consumer$() {
    }
}
