/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker;

import io.mantisrx.common.WorkerPorts;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MachineDefinitions;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.WorkerTopologyInfo;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.worker.VirtualMachineWorkerService;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.MesosExecutorDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class VirtualMachineWorkerServiceLocalImpl
extends BaseService
implements VirtualMachineWorkerService {
    private static final Logger logger = LoggerFactory.getLogger(VirtualMachineWorkerServiceLocalImpl.class);
    private final WorkerTopologyInfo.Data workerInfo;
    private MesosExecutorDriver mesosDriver;
    private ExecutorService executor;
    private Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
    private Observable<VirtualMachineTaskStatus> vmTaskStatusObservable;

    public VirtualMachineWorkerServiceLocalImpl(WorkerTopologyInfo.Data workerInfo, Observer<WrappedExecuteStageRequest> executeStageRequestObserver, Observable<VirtualMachineTaskStatus> vmTaskStatusObservable) {
        this.workerInfo = workerInfo;
        this.executeStageRequestObserver = executeStageRequestObserver;
        this.vmTaskStatusObservable = vmTaskStatusObservable;
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "vm_worker_mesos_executor_thread");
                t.setDaemon(true);
                return t;
            }
        });
    }

    private WrappedExecuteStageRequest createExecuteStageRequest() throws MalformedURLException {
        long timeoutToReportStartSec = 5L;
        URL jobJarUrl = new URL("file:/Users/nmahilani/Projects/Mantis/mantis-sdk/examples/sine-function/build/distributions/sine-function-1.0.zip");
        List<Integer> ports = Arrays.asList(31015, 31013, 31014);
        List<Parameter> params = Collections.singletonList(new Parameter("useRandom", "true"));
        boolean numInstances = true;
        HashMap<Integer, StageSchedulingInfo> schedulingInfoMap = new HashMap<Integer, StageSchedulingInfo>();
        StageSchedulingInfo stage0SchedInfo = StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(MachineDefinitions.micro()).build();
        StageSchedulingInfo stage1SchedInfo = StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(new MachineDefinition(2.0, 300.0, 200.0, 1024.0, 2)).scalingPolicy(new StageScalingPolicy(1, 1, 5, 1, 1, 30L, Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, 15.0, 25.0, new StageScalingPolicy.RollingCount(1, 2))))).scalable(true).build();
        schedulingInfoMap.put(1, stage1SchedInfo);
        SchedulingInfo schedInfo = new SchedulingInfo(schedulingInfoMap);
        ExecuteStageRequest executeStageRequest = new ExecuteStageRequest(this.workerInfo.getJobName(), this.workerInfo.getJobId(), this.workerInfo.getWorkerIndex(), this.workerInfo.getWorkerNumber(), jobJarUrl, this.workerInfo.getStageNumber(), this.workerInfo.getNumStages(), ports, 5L, this.workerInfo.getMetricsPort(), params, schedInfo, MantisJobDurationType.Transient, 0L, 0L, new WorkerPorts(Arrays.asList(7151, 7152, 7153, 7154, 7155)), Optional.empty());
        return new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest);
    }

    private void setupRequestFailureHandler(long waitSeconds, Observable<Boolean> requestObservable, final Action0 errorHandler) {
        requestObservable.buffer(waitSeconds, TimeUnit.SECONDS, 1).take(1).subscribe((Observer)new Observer<List<Boolean>>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                logger.error("onError called for request failure handler");
                errorHandler.call();
            }

            public void onNext(List<Boolean> booleans) {
                logger.info("onNext called for request failure handler with items: " + (booleans == null ? "-1" : Integer.valueOf(booleans.size())));
                if (booleans == null || booleans.isEmpty()) {
                    errorHandler.call();
                }
            }
        });
    }

    public void start() {
        logger.info("Starting VirtualMachineWorkerServiceLocalImpl");
        Schedulers.newThread().createWorker().schedule(new Action0(){

            public void call() {
                try {
                    WrappedExecuteStageRequest request = null;
                    request = VirtualMachineWorkerServiceLocalImpl.this.createExecuteStageRequest();
                    VirtualMachineWorkerServiceLocalImpl.this.setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), (Observable<Boolean>)((Observable)request.getRequestSubject()), new Action0(){

                        public void call() {
                            logger.error("launch error");
                        }
                    });
                    logger.info("onNext'ing WrappedExecuteStageRequest: {}", (Object)request.toString());
                    VirtualMachineWorkerServiceLocalImpl.this.executeStageRequestObserver.onNext((Object)request);
                }
                catch (MalformedURLException e) {
                    e.printStackTrace();
                }
            }
        }, 2L, TimeUnit.SECONDS);
        this.vmTaskStatusObservable.subscribe((Action1)new Action1<VirtualMachineTaskStatus>(){

            public void call(VirtualMachineTaskStatus vmTaskStatus) {
                VirtualMachineTaskStatus.TYPE type = vmTaskStatus.getType();
                if (type == VirtualMachineTaskStatus.TYPE.COMPLETED) {
                    logger.info("Got COMPLETED state for " + vmTaskStatus.getTaskId());
                } else if (type == VirtualMachineTaskStatus.TYPE.STARTED) {
                    logger.info("Would send RUNNING state to mesos, worker started for " + vmTaskStatus.getTaskId());
                }
            }
        });
    }

    public void shutdown() {
        logger.info("Unregistering Mantis Worker with Mesos executor callbacks");
        this.mesosDriver.stop();
        this.executor.shutdown();
    }

    public void enterActiveMode() {
    }
}

