/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.mesos;

import io.mantisrx.common.JsonSerializer;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.subjects.PublishSubject;

public class MesosExecutorCallbackHandler
implements Executor {
    private static final Logger logger = LoggerFactory.getLogger(MesosExecutorCallbackHandler.class);
    private Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
    private final JsonSerializer serializer = new JsonSerializer();

    public MesosExecutorCallbackHandler(Observer<WrappedExecuteStageRequest> executeStageRequestObserver) {
        this.executeStageRequestObserver = executeStageRequestObserver;
    }

    public void disconnected(ExecutorDriver arg0) {
    }

    public void error(ExecutorDriver arg0, String msg) {
        logger.error(msg);
    }

    public void frameworkMessage(ExecutorDriver arg0, byte[] arg1) {
    }

    public void killTask(ExecutorDriver arg0, Protos.TaskID task) {
        logger.info("Executor going to kill task " + task.getValue());
        this.executeStageRequestObserver.onCompleted();
        this.waitAndExit();
    }

    private void waitAndExit() {
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    1.sleep(2000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                System.exit(0);
            }
        };
        t.setDaemon(true);
        t.start();
    }

    private WrappedExecuteStageRequest createExecuteStageRequest(Protos.TaskInfo task) {
        try {
            byte[] jsonBytes = task.getData().toByteArray();
            logger.info("Received request {}", (Object)new String(jsonBytes));
            return new WrappedExecuteStageRequest(PublishSubject.create(), (ExecuteStageRequest)this.serializer.fromJson(jsonBytes, ExecuteStageRequest.class));
        }
        catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    private void sendLaunchError(ExecutorDriver driver, Protos.TaskInfo task) {
        driver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(task.getTaskId()).setState(Protos.TaskState.TASK_FAILED).build());
        this.waitAndExit();
    }

    private void setupRequestFailureHandler(long waitSeconds, Observable<Boolean> requestObservable, final Action0 errorHandler) {
        requestObservable.buffer(waitSeconds, TimeUnit.SECONDS, 1).take(1).subscribe((Observer)new Observer<List<Boolean>>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                logger.error("onError called for request failure handler");
                errorHandler.call();
            }

            public void onNext(List<Boolean> booleans) {
                logger.info("onNext called for request failure handler with items: " + (booleans == null ? "-1" : Integer.valueOf(booleans.size())));
                if (booleans == null || booleans.isEmpty()) {
                    errorHandler.call();
                }
            }
        });
    }

    public void launchTask(final ExecutorDriver driver, final Protos.TaskInfo task) {
        WrappedExecuteStageRequest request = this.createExecuteStageRequest(task);
        logger.info("Worker for task [" + task.getTaskId().getValue() + "] with startTimeout=" + request.getRequest().getTimeoutToReportStart());
        this.setupRequestFailureHandler(request.getRequest().getTimeoutToReportStart(), (Observable<Boolean>)request.getRequestSubject(), new Action0(){

            public void call() {
                MesosExecutorCallbackHandler.this.sendLaunchError(driver, task);
            }
        });
        this.executeStageRequestObserver.onNext((Object)request);
    }

    public void registered(ExecutorDriver arg0, Protos.ExecutorInfo arg1, Protos.FrameworkInfo arg2, Protos.SlaveInfo arg3) {
    }

    public void reregistered(ExecutorDriver arg0, Protos.SlaveInfo arg1) {
    }

    public void shutdown(ExecutorDriver arg0) {
    }
}

