package io.mantisrx.server.agent;

import com.mantisrx.common.utils.Services;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.TaskFactory;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.server.agent.TaskExecutor;
import io.mantisrx.server.core.MantisAkkaRpcSystemLoader;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/server/agent/TaskExecutorStarter.class */
public class TaskExecutorStarter extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(TaskExecutorStarter.class);
    private final TaskExecutor taskExecutor;
    private final HighAvailabilityServices highAvailabilityServices;
    private final RpcSystem rpcSystem;

    /* loaded from: input_file:io/mantisrx/server/agent/TaskExecutorStarter$TaskExecutorStarterBuilder.class */
    public static class TaskExecutorStarterBuilder {
        private final WorkerConfiguration workerConfiguration;
        private Configuration configuration;

        @Nullable
        private RpcSystem rpcSystem;

        @Nullable
        private RpcService rpcService;

        @Nullable
        private ClassLoaderHandle classLoaderHandle;
        private final HighAvailabilityServices highAvailabilityServices;

        @Nullable
        private TaskFactory taskFactory;
        private final List<Tuple2<TaskExecutor.Listener, Executor>> listeners;

        private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) {
            this.listeners = new ArrayList();
            this.workerConfiguration = workerConfiguration;
            this.configuration = new Configuration();
            this.highAvailabilityServices = HighAvailabilityServicesUtil.createHAServices(workerConfiguration);
        }

        public TaskExecutorStarterBuilder configuration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) {
            Preconditions.checkNotNull(rpcSystem);
            this.rpcSystem = rpcSystem;
            return this;
        }

        private RpcSystem getRpcSystem() {
            return this.rpcSystem == null ? MantisAkkaRpcSystemLoader.getInstance() : this.rpcSystem;
        }

        public TaskExecutorStarterBuilder rpcService(RpcService rpcService) {
            Preconditions.checkNotNull(rpcService);
            this.rpcService = rpcService;
            return this;
        }

        private RpcService getRpcService() throws Exception {
            return this.rpcService == null ? RpcUtils.createRemoteRpcService(getRpcSystem(), this.configuration, this.workerConfiguration.getExternalAddress(), this.workerConfiguration.getExternalPortRange(), this.workerConfiguration.getBindAddress(), Optional.ofNullable(this.workerConfiguration.getBindPort())) : this.rpcService;
        }

        public TaskExecutorStarterBuilder taskFactory(TaskFactory taskFactory) {
            this.taskFactory = taskFactory;
            return this;
        }

        public TaskExecutorStarterBuilder classLoaderHandle(ClassLoaderHandle classLoaderHandle) {
            this.classLoaderHandle = classLoaderHandle;
            return this;
        }

        private ClassLoaderHandle getClassLoaderHandle() throws Exception {
            return this.classLoaderHandle == null ? new BlobStoreAwareClassLoaderHandle(BlobStore.forHadoopFileSystem(this.workerConfiguration.getBlobStoreArtifactDir(), this.workerConfiguration.getLocalStorageDir())) : this.classLoaderHandle;
        }

        public TaskExecutorStarterBuilder addListener(TaskExecutor.Listener listener, Executor executor) {
            this.listeners.add(Tuple.of(listener, executor));
            return this;
        }

        public TaskExecutorStarter build() throws Exception {
            TaskExecutor taskExecutor = new TaskExecutor(getRpcService(), this.workerConfiguration, this.highAvailabilityServices, getClassLoaderHandle(), this.taskFactory);
            for (Tuple2<TaskExecutor.Listener, Executor> tuple2 : this.listeners) {
                taskExecutor.addListener((TaskExecutor.Listener) tuple2._1(), (Executor) tuple2._2());
            }
            return new TaskExecutorStarter(taskExecutor, this.highAvailabilityServices, getRpcSystem());
        }
    }

    protected void startUp() {
        System.setProperty("rx.ring-buffer.size", "1024");
        this.highAvailabilityServices.startAsync().awaitRunning();
        this.taskExecutor.start();
        try {
            this.taskExecutor.awaitRunning().get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void shutDown() throws Exception {
        CompletableFuture thenCompose = this.taskExecutor.closeAsync().exceptionally(th -> {
            return null;
        }).thenCompose(r4 -> {
            return Services.stopAsync(this.highAvailabilityServices, MoreExecutors.directExecutor());
        });
        RpcSystem rpcSystem = this.rpcSystem;
        rpcSystem.getClass();
        thenCompose.thenRunAsync(rpcSystem::close).get();
    }

    public TaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfiguration) {
        return new TaskExecutorStarterBuilder(workerConfiguration);
    }

    @ConstructorProperties({"taskExecutor", "highAvailabilityServices", "rpcSystem"})
    private TaskExecutorStarter(TaskExecutor taskExecutor, HighAvailabilityServices highAvailabilityServices, RpcSystem rpcSystem) {
        this.taskExecutor = taskExecutor;
        this.highAvailabilityServices = highAvailabilityServices;
        this.rpcSystem = rpcSystem;
    }
}
