/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker;

import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.worker.ExecutionDetails;
import io.mantisrx.server.worker.InstantiationUtil;
import io.mantisrx.server.worker.TrackedExecuteStageRequest;
import io.mantisrx.server.worker.WorkerExecutionOperations;
import java.io.IOException;
import java.util.Optional;
import java.util.ServiceLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

public class ExecuteStageRequestService
extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(ExecuteStageRequestService.class);
    private final Observable<WrappedExecuteStageRequest> executeStageRequestObservable;
    private final Observer<Observable<Status>> tasksStatusObserver;
    private final WorkerExecutionOperations executionOperations;
    private final Optional<String> jobProviderClass;
    private final Optional<Job> mantisJob;
    private final UserCodeClassLoader userCodeClassLoader;
    private Subscription subscription;

    public ExecuteStageRequestService(Observable<WrappedExecuteStageRequest> executeStageRequestObservable, Observer<Observable<Status>> tasksStatusObserver, WorkerExecutionOperations executionOperations, Optional<String> jobProviderClass, UserCodeClassLoader userCodeClassLoader, Optional<Job> mantisJob) {
        this.executeStageRequestObservable = executeStageRequestObservable;
        this.tasksStatusObserver = tasksStatusObserver;
        this.executionOperations = executionOperations;
        this.jobProviderClass = jobProviderClass;
        this.userCodeClassLoader = userCodeClassLoader;
        this.mantisJob = mantisJob;
    }

    public void start() {
        this.subscription = this.executeStageRequestObservable.map((Func1)new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>(){

            public TrackedExecuteStageRequest call(WrappedExecuteStageRequest executeRequest) {
                PublishSubject statusSubject = PublishSubject.create();
                ExecuteStageRequestService.this.tasksStatusObserver.onNext((Object)statusSubject);
                return new TrackedExecuteStageRequest(executeRequest, (Observer<Status>)statusSubject);
            }
        }).flatMap((Func1)new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>(){

            public Observable<ExecutionDetails> call(TrackedExecuteStageRequest executeRequest) {
                Job mantisJob;
                ExecuteStageRequest executeStageRequest = executeRequest.getExecuteRequest().getRequest();
                ClassLoader cl = null;
                try {
                    if (!ExecuteStageRequestService.this.mantisJob.isPresent()) {
                        logger.info("Loading JAR files for task {}.", (Object)this);
                        cl = ExecuteStageRequestService.this.userCodeClassLoader.asClassLoader();
                        if (ExecuteStageRequestService.this.jobProviderClass.isPresent()) {
                            logger.info("loading job main class " + (String)ExecuteStageRequestService.this.jobProviderClass.get());
                            MantisJobProvider jobProvider = InstantiationUtil.instantiate((String)ExecuteStageRequestService.this.jobProviderClass.get(), MantisJobProvider.class, cl);
                            mantisJob = jobProvider.getJobInstance();
                        } else {
                            logger.info("using serviceLoader to get job instance");
                            ServiceLoader<MantisJobProvider> provider = ServiceLoader.load(MantisJobProvider.class, cl);
                            MantisJobProvider mantisJobProvider = provider.iterator().next();
                            mantisJob = mantisJobProvider.getJobInstance();
                        }
                    } else {
                        cl = ExecuteStageRequestService.this.userCodeClassLoader.asClassLoader();
                        mantisJob = (Job)ExecuteStageRequestService.this.mantisJob.get();
                    }
                }
                catch (Throwable e) {
                    logger.error("Failed to load job class", e);
                    executeRequest.getStatus().onError(e);
                    return Observable.empty();
                }
                logger.info("Executing job {}", (Object)mantisJob);
                return Observable.just((Object)new ExecutionDetails(executeRequest.getExecuteRequest(), executeRequest.getStatus(), mantisJob, cl, executeStageRequest.getParameters()));
            }
        }).subscribe((Observer)new Observer<ExecutionDetails>(){

            public void onCompleted() {
                logger.error("Execute stage observable completed");
                try {
                    ExecuteStageRequestService.this.executionOperations.shutdownStage();
                }
                catch (IOException e) {
                    logger.error("Failed to close stage cleanly", (Throwable)e);
                }
            }

            public void onError(Throwable e) {
                logger.error("Execute stage observable threw exception", e);
            }

            public void onNext(final ExecutionDetails executionDetails) {
                logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
                Thread t = new Thread("mantis-worker-thread-" + executionDetails.getExecuteStageRequest().getRequest().getJobId()){

                    @Override
                    public void run() {
                        try {
                            ExecuteStageRequestService.this.executionOperations.executeStage(executionDetails);
                        }
                        catch (Throwable t) {
                            logger.error("Failed to execute job stage", t);
                        }
                    }
                };
                ClassLoader cl = executionDetails.getClassLoader();
                t.setContextClassLoader(cl);
                t.setDaemon(true);
                t.start();
            }
        });
    }

    public void shutdown() {
        this.subscription.unsubscribe();
        try {
            logger.info("Shutting down execution operations");
            this.executionOperations.shutdownStage();
        }
        catch (IOException e) {
            logger.error("Failed to close cleanly", (Throwable)e);
        }
    }

    public void enterActiveMode() {
    }
}

