package io.mantisrx.server.worker;

import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import java.io.IOException;
import java.util.Optional;
import java.util.ServiceLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/ExecuteStageRequestService.class */
public class ExecuteStageRequestService extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(ExecuteStageRequestService.class);
    private final Observable<WrappedExecuteStageRequest> executeStageRequestObservable;
    private final Observer<Observable<Status>> tasksStatusObserver;
    private final WorkerExecutionOperations executionOperations;
    private final Optional<String> jobProviderClass;
    private final Optional<Job> mantisJob;
    private final UserCodeClassLoader userCodeClassLoader;
    private Subscription subscription;

    public ExecuteStageRequestService(Observable<WrappedExecuteStageRequest> observable, Observer<Observable<Status>> observer, WorkerExecutionOperations workerExecutionOperations, Optional<String> optional, UserCodeClassLoader userCodeClassLoader, Optional<Job> optional2) {
        this.executeStageRequestObservable = observable;
        this.tasksStatusObserver = observer;
        this.executionOperations = workerExecutionOperations;
        this.jobProviderClass = optional;
        this.userCodeClassLoader = userCodeClassLoader;
        this.mantisJob = optional2;
    }

    public void start() {
        this.subscription = this.executeStageRequestObservable.map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.3
            public TrackedExecuteStageRequest call(WrappedExecuteStageRequest wrappedExecuteStageRequest) {
                PublishSubject create = PublishSubject.create();
                ExecuteStageRequestService.this.tasksStatusObserver.onNext(create);
                return new TrackedExecuteStageRequest(wrappedExecuteStageRequest, create);
            }
        }).flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.2
            public Observable<ExecutionDetails> call(TrackedExecuteStageRequest trackedExecuteStageRequest) {
                ClassLoader asClassLoader;
                Job job;
                ExecuteStageRequest request = trackedExecuteStageRequest.getExecuteRequest().getRequest();
                try {
                    if (ExecuteStageRequestService.this.mantisJob.isPresent()) {
                        asClassLoader = ExecuteStageRequestService.this.userCodeClassLoader.asClassLoader();
                        job = (Job) ExecuteStageRequestService.this.mantisJob.get();
                    } else {
                        ExecuteStageRequestService.logger.info("Loading JAR files for task {}.", this);
                        asClassLoader = ExecuteStageRequestService.this.userCodeClassLoader.asClassLoader();
                        if (ExecuteStageRequestService.this.jobProviderClass.isPresent()) {
                            ExecuteStageRequestService.logger.info("loading job main class " + ((String) ExecuteStageRequestService.this.jobProviderClass.get()));
                            job = ((MantisJobProvider) InstantiationUtil.instantiate((String) ExecuteStageRequestService.this.jobProviderClass.get(), MantisJobProvider.class, asClassLoader)).getJobInstance();
                        } else {
                            ExecuteStageRequestService.logger.info("using serviceLoader to get job instance");
                            job = ((MantisJobProvider) ServiceLoader.load(MantisJobProvider.class, asClassLoader).iterator().next()).getJobInstance();
                        }
                    }
                    ExecuteStageRequestService.logger.info("Executing job {}", job);
                    return Observable.just(new ExecutionDetails(trackedExecuteStageRequest.getExecuteRequest(), trackedExecuteStageRequest.getStatus(), job, asClassLoader, request.getParameters()));
                } catch (Throwable th) {
                    ExecuteStageRequestService.logger.error("Failed to load job class", th);
                    trackedExecuteStageRequest.getStatus().onError(th);
                    return Observable.empty();
                }
            }
        }).subscribe(new Observer<ExecutionDetails>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.1
            public void onCompleted() {
                ExecuteStageRequestService.logger.error("Execute stage observable completed");
                try {
                    ExecuteStageRequestService.this.executionOperations.shutdownStage();
                } catch (IOException e) {
                    ExecuteStageRequestService.logger.error("Failed to close stage cleanly", e);
                }
            }

            public void onError(Throwable th) {
                ExecuteStageRequestService.logger.error("Execute stage observable threw exception", th);
            }

            public void onNext(final ExecutionDetails executionDetails) {
                ExecuteStageRequestService.logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
                Thread thread = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()) { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.1.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            ExecuteStageRequestService.this.executionOperations.executeStage(executionDetails);
                        } catch (Throwable th) {
                            ExecuteStageRequestService.logger.error("Failed to execute job stage", th);
                        }
                    }
                };
                thread.setContextClassLoader(executionDetails.getClassLoader());
                thread.setDaemon(true);
                thread.start();
            }
        });
    }

    public void shutdown() {
        this.subscription.unsubscribe();
        try {
            logger.info("Shutting down execution operations");
            this.executionOperations.shutdownStage();
        } catch (IOException e) {
            logger.error("Failed to close cleanly", e);
        }
    }

    public void enterActiveMode() {
    }
}
