package com.daml.platform.indexer.parallel;

import akka.stream.Materializer;
import com.daml.ledger.participant.state.v2.ReadService;
import com.daml.ledger.resources.ResourceContext;
import com.daml.ledger.resources.ResourceContext$Context$u0020has$u0020ExecutionContext$;
import com.daml.ledger.resources.ResourceOwner$;
import com.daml.ledger.resources.package$;
import com.daml.logging.ContextualizedLogger$;
import com.daml.logging.LoggingContext;
import com.daml.metrics.Metrics;
import com.daml.metrics.api.MetricName;
import com.daml.platform.configuration.ServerRole;
import com.daml.platform.indexer.ha.HaConfig;
import com.daml.platform.indexer.ha.HaCoordinator$;
import com.daml.platform.indexer.ha.Handle;
import com.daml.platform.indexer.ha.NoopHaCoordinator$;
import com.daml.platform.store.DbSupport;
import com.daml.platform.store.backend.DBLockStorageBackend;
import com.daml.platform.store.backend.DataSourceStorageBackend;
import com.daml.platform.store.backend.ParameterStorageBackend;
import com.daml.platform.store.dao.DbDispatcher;
import com.daml.platform.store.dao.DbDispatcher$;
import com.daml.resources.AbstractResourceOwner;
import com.daml.resources.Resource;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.sql.Connection;
import java.util.Timer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.sql.DataSource;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.runtime.BoxedUnit;

/* compiled from: ParallelIndexerFactory.scala */
/* loaded from: input_file:com/daml/platform/indexer/parallel/ParallelIndexerFactory$.class */
public final class ParallelIndexerFactory$ {
    public static final ParallelIndexerFactory$ MODULE$ = new ParallelIndexerFactory$();

    public AbstractResourceOwner<ResourceContext, AbstractResourceOwner<ResourceContext, Future<BoxedUnit>>> apply(int i, int i2, DbSupport.DbConfig dbConfig, HaConfig haConfig, Metrics metrics, DBLockStorageBackend dBLockStorageBackend, DataSourceStorageBackend dataSourceStorageBackend, InitializeParallelIngestion initializeParallelIngestion, ParallelIndexerSubscription<?> parallelIndexerSubscription, Function1<DbDispatcher, AbstractResourceOwner<ResourceContext, BoxedUnit>> function1, Materializer materializer, ReadService readService, Function1<DbDispatcher, Function1<ParameterStorageBackend.LedgerEnd, Future<BoxedUnit>>> function12, LoggingContext loggingContext) {
        return AsyncSupport$.MODULE$.asyncPool(i, "input-mapping-pool", new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new MetricName(metrics.daml().parallelIndexer().inputMapping().executor())), metrics.registry())), loggingContext).flatMap(executor -> {
            return AsyncSupport$.MODULE$.asyncPool(i2, "batching-pool", new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new MetricName(metrics.daml().parallelIndexer().batching().executor())), metrics.registry())), loggingContext).flatMap(executor -> {
                return (dBLockStorageBackend.dbLockSupported() ? ResourceOwner$.MODULE$.forExecutorService(() -> {
                    return ExecutionContext$.MODULE$.fromExecutorService(Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ha-coordinator-%d").build()), th -> {
                        $anonfun$apply$4(loggingContext, th);
                        return BoxedUnit.UNIT;
                    });
                }).flatMap(executionContextExecutorService -> {
                    return ResourceOwner$.MODULE$.forTimer(() -> {
                        return new Timer();
                    }).map(timer -> {
                        return new Tuple2(timer, dataSourceStorageBackend.createDataSource(dbConfig.dataSourceConfig(), dataSourceStorageBackend.createDataSource$default$2(), loggingContext));
                    }).map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        Timer timer2 = (Timer) tuple2._1();
                        DataSource dataSource = (DataSource) tuple2._2();
                        return HaCoordinator$.MODULE$.databaseLockBasedHaCoordinator(() -> {
                            Connection connection = dataSource.getConnection();
                            connection.setNetworkTimeout(new Executor() { // from class: com.daml.platform.indexer.parallel.ParallelIndexerFactory$$anon$1
                                @Override // java.util.concurrent.Executor
                                public void execute(Runnable runnable) {
                                    runnable.run();
                                }
                            }, haConfig.mainLockCheckerJdbcNetworkTimeoutMillis());
                            return connection;
                        }, dBLockStorageBackend, executionContextExecutorService, timer2, haConfig, loggingContext);
                    });
                }) : ResourceOwner$.MODULE$.successful(NoopHaCoordinator$.MODULE$)).map(haCoordinator -> {
                    return MODULE$.toIndexer(resourceContext -> {
                        ExecutionContext executionContext = resourceContext.executionContext();
                        return haCoordinator.protectedExecution(connectionInitializer -> {
                            return MODULE$.initializeHandle(DbDispatcher$.MODULE$.owner(dataSourceStorageBackend.createDataSource(dbConfig.dataSourceConfig(), new Some(connection -> {
                                connectionInitializer.initialize(connection);
                                return BoxedUnit.UNIT;
                            }), loggingContext), new ServerRole() { // from class: com.daml.platform.configuration.ServerRole$Indexer$
                                private static final String threadPoolSuffix = "indexer";

                                @Override // com.daml.platform.configuration.ServerRole
                                public String threadPoolSuffix() {
                                    return threadPoolSuffix;
                                }
                            }, dbConfig.connectionPool().connectionPoolSize(), dbConfig.connectionPool().connectionTimeout(), metrics, loggingContext).flatMap(dbDispatcher -> {
                                return ((AbstractResourceOwner) function1.apply(dbDispatcher)).map(boxedUnit -> {
                                    return dbDispatcher;
                                });
                            }), dbDispatcher2 -> {
                                return initializeParallelIngestion.apply(dbDispatcher2, (Function1) function12.apply(dbDispatcher2), readService, executionContext, materializer, loggingContext).map(parallelIndexerSubscription.apply(executor, executor, dbDispatcher2, materializer, loggingContext), executionContext);
                            }, resourceContext);
                        });
                    });
                });
            });
        });
    }

    public <T> Future<Handle> initializeHandle(AbstractResourceOwner<ResourceContext, T> abstractResourceOwner, Function1<T, Future<Handle>> function1, ResourceContext resourceContext) {
        ExecutionContext executionContext = resourceContext.executionContext();
        Promise apply = Promise$.MODULE$.apply();
        Future andThen = abstractResourceOwner.use(obj -> {
            return ((Future) function1.apply(obj)).andThen(new ParallelIndexerFactory$$anonfun$$nestedInanonfun$initializeHandle$1$1(apply), executionContext).flatMap(handle -> {
                return handle.completed();
            }, executionContext);
        }, resourceContext).andThen(new ParallelIndexerFactory$$anonfun$1(apply), executionContext);
        return apply.future().map(killSwitch -> {
            return new Handle(andThen, killSwitch);
        }, executionContext);
    }

    public AbstractResourceOwner<ResourceContext, Future<BoxedUnit>> toIndexer(final Function1<ResourceContext, Handle> function1) {
        return new AbstractResourceOwner<ResourceContext, Future<BoxedUnit>>(function1) { // from class: com.daml.platform.indexer.parallel.ParallelIndexerFactory$$anon$2
            private final Function1 subscription$1;

            public Resource<ResourceContext, Future<BoxedUnit>> acquire(ResourceContext resourceContext) {
                return package$.MODULE$.Resource().apply(Future$.MODULE$.apply(() -> {
                    return (Handle) this.subscription$1.apply(resourceContext);
                }, executionContext(resourceContext)), handle -> {
                    handle.killSwitch().shutdown();
                    return handle.completed().recover(new ParallelIndexerFactory$$anon$2$$anonfun$$nestedInanonfun$acquire$2$1(null), this.executionContext(resourceContext));
                }, resourceContext).map(handle2 -> {
                    return handle2.completed();
                }, resourceContext);
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(ResourceContext$Context$u0020has$u0020ExecutionContext$.MODULE$);
                this.subscription$1 = function1;
            }
        };
    }

    public static final /* synthetic */ void $anonfun$apply$4(LoggingContext loggingContext, Throwable th) {
        ContextualizedLogger$.MODULE$.get(MODULE$.getClass()).error().apply(() -> {
            return "ExecutionContext has failed with an exception";
        }, th, loggingContext);
    }

    private ParallelIndexerFactory$() {
    }
}
