package io.mantisrx.server.worker;

import com.mantisrx.common.utils.Closeables;
import com.netflix.spectator.api.Registry;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.StageExecutors;
import io.mantisrx.runtime.executor.WorkerConsumer;
import io.mantisrx.runtime.executor.WorkerConsumerRemoteObservable;
import io.mantisrx.runtime.executor.WorkerPublisherRemoteObservable;
import io.mantisrx.runtime.lifecycle.Lifecycle;
import io.mantisrx.runtime.lifecycle.ServiceLocator;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.mantisrx.runtime.parameter.Parameters;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.StatusPayloads;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.worker.RunningWorker;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.server.worker.jobmaster.AutoScaleMetricsConfig;
import io.mantisrx.server.worker.jobmaster.JobMasterService;
import io.mantisrx.server.worker.jobmaster.JobMasterStageConfig;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import io.mantisrx.shaded.com.google.common.base.Splitter;
import io.mantisrx.shaded.com.google.common.base.Strings;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/server/worker/WorkerExecutionOperationsNetworkStage.class */
public class WorkerExecutionOperationsNetworkStage implements WorkerExecutionOperations {
    private static final Logger logger = LoggerFactory.getLogger(WorkerExecutionOperationsNetworkStage.class);
    private final WorkerConfiguration config;
    private final WorkerMetricsClient workerMetricsClient;
    private final SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory;
    private final Observer<VirtualMachineTaskStatus> vmTaskStatusObserver;
    private final MantisMasterGateway mantisMasterApi;
    private int connectionsPerEndpoint;
    private boolean lookupSpectatorRegistry;
    private SinkSubscriptionStateHandler subscriptionStateHandler;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ClassLoader classLoader;
    private Observer<Status> jobStatusObserver;
    private final AtomicReference<Heartbeat> heartbeatRef = new AtomicReference<>();
    private Action0 onSinkSubscribe = null;
    private Action0 onSinkUnsubscribe = null;
    private final List<Closeable> closeables = new ArrayList();

    public WorkerExecutionOperationsNetworkStage(Observer<VirtualMachineTaskStatus> observer, MantisMasterGateway mantisMasterGateway, WorkerConfiguration workerConfiguration, WorkerMetricsClient workerMetricsClient, SinkSubscriptionStateHandler.Factory factory, ClassLoader classLoader) {
        this.connectionsPerEndpoint = 2;
        this.lookupSpectatorRegistry = true;
        this.vmTaskStatusObserver = observer;
        this.mantisMasterApi = mantisMasterGateway;
        this.config = workerConfiguration;
        this.workerMetricsClient = workerMetricsClient;
        this.sinkSubscriptionStateHandlerFactory = factory;
        this.classLoader = classLoader;
        String stringValue = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.connectionsPerEndpoint", "2");
        if (stringValue != null && !stringValue.equals("2")) {
            this.connectionsPerEndpoint = Integer.parseInt(stringValue);
        }
        this.lookupSpectatorRegistry = Boolean.valueOf(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true")).booleanValue();
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static WorkerMap convertJobSchedulingInfoToWorkerMap(String str, String str2, MantisJobDurationType mantisJobDurationType, JobSchedulingInfo jobSchedulingInfo) {
        HashMap hashMap = new HashMap();
        WorkerMap workerMap = new WorkerMap(hashMap);
        if (str == null || str.isEmpty() || str2 == null || str2.isEmpty()) {
            logger.warn("Job name/jobId cannot be null in convertJobSchedulingInfoToWorkerMap");
            return workerMap;
        }
        if (jobSchedulingInfo == null || jobSchedulingInfo.getWorkerAssignments() == null) {
            logger.warn("JobSchedulingInfo or workerAssignments cannot be null in convertJobSchedulingInfoToWorkerMap");
            return workerMap;
        }
        try {
            for (Map.Entry entry : jobSchedulingInfo.getWorkerAssignments().entrySet()) {
                int intValue = ((Integer) entry.getKey()).intValue();
                Map hosts = ((WorkerAssignments) entry.getValue()).getHosts();
                if (hosts != null) {
                    hashMap.put(Integer.valueOf(intValue), (List) hosts.values().stream().map(workerHost -> {
                        return generateWorkerInfo(str, str2, intValue, workerHost.getWorkerIndex(), workerHost.getWorkerNumber(), mantisJobDurationType, workerHost.getHost(), workerHost);
                    }).collect(Collectors.toList()));
                }
            }
            workerMap = new WorkerMap(hashMap);
            return workerMap;
        } catch (Exception e) {
            logger.warn("Exception converting JobSchedulingInfo " + jobSchedulingInfo + " to worker Map " + e.getMessage());
            return workerMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static WorkerInfo generateWorkerInfo(String str, String str2, int i, int i2, int i3, MantisJobDurationType mantisJobDurationType, String str3, WorkerHost workerHost) {
        return generateWorkerInfo(str, str2, i, i2, i3, mantisJobDurationType, str3, new WorkerPorts(workerHost.getMetricsPort(), 65534, 65535, workerHost.getCustomPort(), ((Integer) Optional.ofNullable(workerHost.getPort()).map(list -> {
            if (list.size() >= 1) {
                return (Integer) list.get(0);
            }
            return -1;
        }).orElse(-1)).intValue()));
    }

    private static WorkerInfo generateWorkerInfo(String str, String str2, int i, int i2, int i3, MantisJobDurationType mantisJobDurationType, String str3, WorkerPorts workerPorts) {
        return new WorkerInfo(str, str2, i, i2, i3, mantisJobDurationType, str3, workerPorts);
    }

    private static Context generateContext(Parameters parameters, ServiceLocator serviceLocator, WorkerInfo workerInfo, MetricsRegistry metricsRegistry, Action0 action0, Observable<WorkerMap> observable, ClassLoader classLoader) {
        return new Context(parameters, serviceLocator, workerInfo, metricsRegistry, action0, observable, classLoader);
    }

    private Closeable startSendingHeartbeats(Observer<Status> observer, double d, long j) {
        this.heartbeatRef.get().setPayload(String.valueOf(StatusPayloads.Type.SubscriptionState), "false");
        ScheduledFuture<?> scheduleWithFixedDelay = this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            observer.onNext(this.heartbeatRef.get().getCurrentHeartbeatStatus());
        }, j, j, TimeUnit.SECONDS);
        DataDroppedPayloadSetter dataDroppedPayloadSetter = new DataDroppedPayloadSetter(this.heartbeatRef.get());
        dataDroppedPayloadSetter.start(j);
        ResourceUsagePayloadSetter resourceUsagePayloadSetter = new ResourceUsagePayloadSetter(this.heartbeatRef.get(), this.config, d);
        resourceUsagePayloadSetter.start(j);
        return Closeables.combine(new Closeable[]{() -> {
            scheduleWithFixedDelay.cancel(false);
        }, dataDroppedPayloadSetter, resourceUsagePayloadSetter});
    }

    private Observable<WorkerMap> createWorkerMapObservable(Observable<JobSchedulingInfo> observable, String str, String str2, MantisJobDurationType mantisJobDurationType) {
        return observable.filter(jobSchedulingInfo -> {
            return Boolean.valueOf((jobSchedulingInfo == null || jobSchedulingInfo.getWorkerAssignments() == null || jobSchedulingInfo.getWorkerAssignments().isEmpty()) ? false : true);
        }).map(jobSchedulingInfo2 -> {
            return convertJobSchedulingInfoToWorkerMap(str, str2, mantisJobDurationType, jobSchedulingInfo2);
        });
    }

    private Observable<Integer> createSourceStageTotalWorkersObservable(Observable<JobSchedulingInfo> observable) {
        return observable.filter(jobSchedulingInfo -> {
            return Boolean.valueOf((jobSchedulingInfo == null || jobSchedulingInfo.getWorkerAssignments() == null || jobSchedulingInfo.getWorkerAssignments().isEmpty()) ? false : true);
        }).map(jobSchedulingInfo2 -> {
            return Integer.valueOf(((WorkerAssignments) jobSchedulingInfo2.getWorkerAssignments().get(1)).getNumWorkers());
        });
    }

    private void signalStarted(RunningWorker runningWorker) {
        runningWorker.signalStarted();
    }

    @Override // io.mantisrx.server.worker.WorkerExecutionOperations
    public void executeStage(ExecutionDetails executionDetails) throws IOException {
        ExecuteStageRequest request = executionDetails.getExecuteStageRequest().getRequest();
        this.jobStatusObserver = executionDetails.getStatus();
        Observable<JobSchedulingInfo> doOnCompleted = this.mantisMasterApi.schedulingChanges(request.getJobId()).subscribeOn(Schedulers.io()).replay(1).refCount().doOnSubscribe(() -> {
            logger.info("mantisApi schedulingChanges subscribe");
        }).doOnUnsubscribe(() -> {
            logger.info("mantisApi schedulingChanges stream unsub.");
        }).doOnError(th -> {
            logger.warn("mantisApi schedulingChanges stream error:", th);
        }).doOnCompleted(() -> {
            logger.info("mantisApi schedulingChanges stream completed.");
        });
        WorkerInfo generateWorkerInfo = generateWorkerInfo(request.getJobName(), request.getJobId(), request.getStage(), request.getWorkerIndex(), request.getWorkerNumber(), request.getDurationType(), "host", request.getWorkerPorts());
        RunningWorker.Builder jobId = new RunningWorker.Builder().job(executionDetails.getMantisJob()).schedulingInfo(request.getSchedulingInfo()).stageTotalWorkersObservable(createSourceStageTotalWorkersObservable(doOnCompleted)).jobName(request.getJobName()).stageNum(request.getStage()).workerIndex(request.getWorkerIndex()).workerNum(request.getWorkerNumber()).totalStages(request.getTotalNumStages()).metricsPort(request.getMetricsPort()).ports(request.getPorts().iterator()).jobStatusObserver(executionDetails.getStatus()).requestSubject(executionDetails.getExecuteStageRequest().getRequestSubject()).workerInfo(generateWorkerInfo).vmTaskStatusObservable(this.vmTaskStatusObserver).hasJobMaster(request.getHasJobMaster()).jobId(request.getJobId());
        final RunningWorker build = (request.getStage() == 0 ? jobId.stage(new JobMasterStageConfig("jobmasterconfig")) : jobId.stage((StageConfig) executionDetails.getMantisJob().getStages().get(request.getStage() - 1))).build();
        if (build.getStageNum() == build.getTotalStagesNet()) {
            setupSubscriptionStateHandler(executionDetails.getExecuteStageRequest().getRequest());
        }
        logger.info("Running worker info: " + build);
        build.signalStartedInitiated();
        try {
            logger.info(">>>>>>>>>>>>>>>>Calling lifecycle.startup()");
            Lifecycle lifecycle = build.getJob().getLifecycle();
            lifecycle.startup();
            ServiceLocator serviceLocator = lifecycle.getServiceLocator();
            if (this.lookupSpectatorRegistry) {
                try {
                    SpectatorRegistryFactory.setRegistry((Registry) serviceLocator.service(Registry.class));
                } catch (Throwable th2) {
                    logger.error("failed to init spectator registry using service locator, falling back to {}", SpectatorRegistryFactory.getRegistry().getClass().getCanonicalName());
                }
            }
            Parameters createContextParameters = ParameterUtils.createContextParameters(build.getJob().getParameterDefinitions(), executionDetails.getParameters());
            build.setContext(generateContext(createContextParameters, serviceLocator, generateWorkerInfo, MetricsRegistry.getInstance(), () -> {
                build.signalCompleted();
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e) {
                    logger.warn("Unexpected exception sleeping: " + e.getMessage());
                }
                System.exit(0);
            }, createWorkerMapObservable(doOnCompleted, request.getJobName(), request.getJobId(), request.getDurationType()), this.classLoader));
            this.heartbeatRef.set(new Heartbeat(build.getJobId(), build.getStageNum(), build.getWorkerIndex(), build.getWorkerNum(), this.config.getTaskExecutorHostName()));
            this.closeables.add(startSendingHeartbeats(build.getJobStatus(), request.getSchedulingInfo().forStage(build.getStageNum()).getMachineDefinition().getNetworkMbps(), request.getHeartbeatIntervalSecs()));
            if (build.getStageNum() == 0) {
                logger.info("JobId: " + build.getJobId() + ", executing Job Master");
                AutoScaleMetricsConfig autoScaleMetricsConfig = new AutoScaleMetricsConfig();
                String str = (String) createContextParameters.get("mantis.jobmaster.autoscale.metric", "");
                if (Strings.isNullOrEmpty(str)) {
                    logger.info("param {} is null or empty", "mantis.jobmaster.autoscale.metric");
                } else {
                    List splitToList = Splitter.on("::").omitEmptyStrings().trimResults().splitToList(str);
                    if (splitToList.size() != 3) {
                        String format = String.format("ERROR: Invalid value %s for param %s", str, "mantis.jobmaster.autoscale.metric");
                        logger.error(format);
                        throw new RuntimeException(format);
                    }
                    String str2 = (String) splitToList.get(0);
                    String str3 = (String) splitToList.get(1);
                    try {
                        AutoScaleMetricsConfig.AggregationAlgo valueOf = AutoScaleMetricsConfig.AggregationAlgo.valueOf((String) splitToList.get(2));
                        logger.info("registered UserDefined auto scale metric {}:{} algo {}", new Object[]{str2, str3, valueOf});
                        autoScaleMetricsConfig.addUserDefinedMetric(str2, str3, valueOf);
                    } catch (IllegalArgumentException e) {
                        String format2 = String.format("ERROR: Invalid algorithm value %s for param %s (algo should be one of %s)", autoScaleMetricsConfig, "mantis.jobmaster.autoscale.metric", Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(aggregationAlgo -> {
                            return aggregationAlgo.name();
                        }).collect(Collectors.toList()));
                        logger.error(format2);
                        throw new RuntimeException(format2);
                    }
                }
                JobMasterService jobMasterService = new JobMasterService(build.getJobId(), build.getSchedulingInfo(), this.workerMetricsClient, autoScaleMetricsConfig, this.mantisMasterApi, build.getContext(), build.getOnCompleteCallback(), build.getOnErrorCallback(), build.getOnTerminateCallback());
                jobMasterService.start();
                List<Closeable> list = this.closeables;
                jobMasterService.getClass();
                list.add(jobMasterService::shutdown);
                signalStarted(build);
                build.waitUntilTerminate();
            } else if (build.getStageNum() == 1 && build.getTotalStagesNet() == 1) {
                logger.info("JobId: " + build.getJobId() + ", single stage job, executing entire job");
                this.closeables.add(StageExecutors.executeSingleStageJob(build.getJob().getSource(), build.getStage(), build.getJob().getSink(), new PortSelector() { // from class: io.mantisrx.server.worker.WorkerExecutionOperationsNetworkStage.1
                    public int acquirePort() {
                        return build.getPorts().next().intValue();
                    }
                }, new RxMetrics(), build.getContext(), build.getOnTerminateCallback(), build.getWorkerIndex(), build.getSourceStageTotalWorkersObservable(), this.onSinkSubscribe, this.onSinkUnsubscribe, build.getOnCompleteCallback(), build.getOnErrorCallback()));
                signalStarted(build);
                build.waitUntilTerminate();
            } else {
                logger.info("JobId: " + build.getJobId() + ", executing a multi-stage job, stage: " + build.getStageNum());
                if (build.getStageNum() == 1) {
                    String str4 = build.getJobId() + "_" + build.getStageNum();
                    build.getSchedulingInfo().forStage(1);
                    WorkerPublisherRemoteObservable workerPublisherRemoteObservable = new WorkerPublisherRemoteObservable(build.getPorts().next().intValue(), str4, numWorkersAtStage(doOnCompleted, build.getJobId(), build.getStageNum() + 1), build.getJobName());
                    this.closeables.add(StageExecutors.executeSource(build.getWorkerIndex(), build.getJob().getSource(), build.getStage(), workerPublisherRemoteObservable, build.getContext(), build.getSourceStageTotalWorkersObservable()));
                    logger.info("JobId: " + build.getJobId() + " stage: " + build.getStageNum() + ", serving remote observable for source with name: " + str4);
                    RemoteRxServer server = workerPublisherRemoteObservable.getServer();
                    MetricsRegistry.getInstance().registerAndGet(server.getMetrics().getCountersAndGauges());
                    signalStarted(build);
                    logger.info("JobId: " + build.getJobId() + " stage: " + build.getStageNum() + ", blocking until source observable completes");
                    server.blockUntilServerShutdown();
                } else {
                    executeNonSourceStage(doOnCompleted, build);
                }
            }
            logger.info("Calling lifecycle.shutdown()");
            lifecycle.shutdown();
        } catch (Throwable th3) {
            logger.warn("Error during executing stage; shutting down.", th3);
            build.signalFailed(th3);
            shutdownStage();
        }
    }

    private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) {
        SinkSubscriptionStateHandler sinkSubscriptionStateHandler = (SinkSubscriptionStateHandler) this.sinkSubscriptionStateHandlerFactory.apply(executeStageRequest);
        this.onSinkSubscribe = () -> {
            this.heartbeatRef.get().setPayload(StatusPayloads.Type.SubscriptionState.toString(), Boolean.toString(true));
            sinkSubscriptionStateHandler.onSinkSubscribed();
        };
        this.onSinkUnsubscribe = () -> {
            this.heartbeatRef.get().setPayload(StatusPayloads.Type.SubscriptionState.toString(), Boolean.toString(false));
            sinkSubscriptionStateHandler.onSinkUnsubscribed();
        };
        this.subscriptionStateHandler = sinkSubscriptionStateHandler;
        try {
            this.subscriptionStateHandler.startAsync().awaitRunning(Duration.of(5L, ChronoUnit.SECONDS));
        } catch (TimeoutException e) {
            logger.error("Failed to start subscriptionStateHandler: ", e);
            throw new RuntimeException(e);
        }
    }

    private void executeNonSourceStage(Observable<JobSchedulingInfo> observable, RunningWorker runningWorker) {
        StageConfig stageConfig = (StageConfig) runningWorker.getJob().getStages().get(runningWorker.getStageNum() - 2);
        int numberOfInstances = runningWorker.getSchedulingInfo().forStage(runningWorker.getStageNum() - 1).getNumberOfInstances();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        WorkerConsumer connectToObservableAtPreviousStages = connectToObservableAtPreviousStages(observable, runningWorker.getJobId(), runningWorker.getStageNum() - 1, numberOfInstances, stageConfig, atomicBoolean, runningWorker.getJobStatus(), runningWorker.getStageNum(), runningWorker.getWorkerIndex(), runningWorker.getWorkerNum());
        final int intValue = runningWorker.getPorts().next().intValue();
        if (runningWorker.getStageNum() == runningWorker.getTotalStagesNet()) {
            logger.info("JobId: {}, executing sink stage: {}, signaling started", runningWorker.getJobId(), Integer.valueOf(runningWorker.getStageNum()));
            runningWorker.getJobStatus().onNext(new Status(runningWorker.getJobId(), runningWorker.getStageNum(), runningWorker.getWorkerIndex(), runningWorker.getWorkerNum(), Status.TYPE.INFO, String.format("stage %s worker index=%s number=%s %s", Integer.valueOf(runningWorker.getStageNum()), Integer.valueOf(runningWorker.getWorkerIndex()), Integer.valueOf(runningWorker.getWorkerNum()), "running"), MantisJobState.Started));
            PortSelector portSelector = new PortSelector() { // from class: io.mantisrx.server.worker.WorkerExecutionOperationsNetworkStage.2
                public int acquirePort() {
                    return intValue;
                }
            };
            RxMetrics rxMetrics = new RxMetrics();
            MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.closeables.add(StageExecutors.executeSink(connectToObservableAtPreviousStages, runningWorker.getStage(), runningWorker.getJob().getSink(), portSelector, rxMetrics, runningWorker.getContext(), new Action0() { // from class: io.mantisrx.server.worker.WorkerExecutionOperationsNetworkStage.3
                public void call() {
                    countDownLatch.countDown();
                }
            }, this.onSinkSubscribe, this.onSinkUnsubscribe, runningWorker.getOnCompleteCallback(), runningWorker.getOnErrorCallback()));
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            atomicBoolean.set(false);
            return;
        }
        logger.info("JobId: " + runningWorker.getJobId() + ", executing intermediate stage: " + runningWorker.getStageNum());
        int stageNum = runningWorker.getStageNum();
        String jobId = runningWorker.getJobId();
        String str = jobId + "_" + stageNum;
        WorkerPublisherRemoteObservable workerPublisherRemoteObservable = new WorkerPublisherRemoteObservable(intValue, str, numWorkersAtStage(observable, runningWorker.getJobId(), runningWorker.getStageNum() + 1), runningWorker.getJobName());
        this.closeables.add(StageExecutors.executeIntermediate(connectToObservableAtPreviousStages, runningWorker.getStage(), workerPublisherRemoteObservable, runningWorker.getContext()));
        RemoteRxServer server = workerPublisherRemoteObservable.getServer();
        logger.info("JobId: " + jobId + " stage: " + stageNum + ", serving intermediate remote observable with name: " + str);
        MetricsRegistry.getInstance().registerAndGet(server.getMetrics().getCountersAndGauges());
        signalStarted(runningWorker);
        logger.info("JobId: " + jobId + " stage: " + stageNum + ", blocking until intermediate observable completes");
        server.blockUntilServerShutdown();
        atomicBoolean.set(false);
    }

    private Observable<Integer> numWorkersAtStage(Observable<JobSchedulingInfo> observable, String str, int i) {
        return observable.distinctUntilChanged((jobSchedulingInfo, jobSchedulingInfo2) -> {
            return Boolean.valueOf(jobSchedulingInfo.equals(jobSchedulingInfo2));
        }).flatMap(jobSchedulingInfo3 -> {
            Map workerAssignments = jobSchedulingInfo3.getWorkerAssignments();
            return (workerAssignments == null || workerAssignments.isEmpty()) ? Observable.empty() : Observable.from(workerAssignments.values());
        }).filter(workerAssignments -> {
            return Boolean.valueOf(workerAssignments.getStage() == i);
        }).map(workerAssignments2 -> {
            return Integer.valueOf(workerAssignments2.getNumWorkers() * this.connectionsPerEndpoint);
        }).share();
    }

    private WorkerConsumer connectToObservableAtPreviousStages(Observable<JobSchedulingInfo> observable, String str, int i, int i2, StageConfig stageConfig, AtomicBoolean atomicBoolean, Observer<Status> observer, int i3, int i4, int i5) {
        logger.info("Watching for scheduling changes");
        return new WorkerConsumerRemoteObservable(str + "_" + i, new ToDeltaEndpointInjector(observable.flatMap(jobSchedulingInfo -> {
            Map workerAssignments = jobSchedulingInfo.getWorkerAssignments();
            return (workerAssignments == null || workerAssignments.isEmpty()) ? Observable.empty() : Observable.from(workerAssignments.values());
        }).filter(workerAssignments -> {
            return Boolean.valueOf(workerAssignments.getStage() == i && atomicBoolean.get());
        }).map(workerAssignments2 -> {
            LinkedList linkedList = new LinkedList();
            for (WorkerHost workerHost : workerAssignments2.getHosts().values()) {
                if (workerHost.getState() == MantisJobState.Started) {
                    logger.info("Received scheduling update from master, connect request for host: " + workerHost.getHost() + " port: " + workerHost.getPort() + " state: " + workerHost.getState() + " adding: " + this.connectionsPerEndpoint + " connections to host");
                    for (int i6 = 1; i6 <= this.connectionsPerEndpoint; i6++) {
                        String str2 = "stage_" + i3 + "_index_" + Integer.toString(i4) + "_partition_" + i6;
                        logger.info("Adding endpoint to endpoint injector to be considered for add, with id: " + str2);
                        linkedList.add(new Endpoint(workerHost.getHost(), ((Integer) workerHost.getPort().get(0)).intValue(), str2));
                    }
                }
            }
            return linkedList;
        }).filter(list -> {
            return Boolean.valueOf(list.size() > 0);
        })));
    }

    @Override // io.mantisrx.server.worker.WorkerExecutionOperations
    public void shutdownStage() throws IOException {
        if (this.jobStatusObserver != null) {
            Heartbeat heartbeat = this.heartbeatRef.get();
            this.jobStatusObserver.onNext(new Status(heartbeat.getJobId(), heartbeat.getStageNumber(), heartbeat.getWorkerIndex(), heartbeat.getWorkerNumber(), Status.TYPE.INFO, String.format("stage %s worker index=%s number=%s %s", Integer.valueOf(heartbeat.getStageNumber()), Integer.valueOf(heartbeat.getWorkerIndex()), Integer.valueOf(heartbeat.getWorkerNumber()), "shutdown"), MantisJobState.Failed));
        }
        try {
        } catch (Exception e) {
            logger.error("Failed to stop subscription state handler successfully", e);
        } finally {
            this.subscriptionStateHandler = null;
        }
        if (this.subscriptionStateHandler != null) {
            this.subscriptionStateHandler.stopAsync();
        }
        Closeables.combine(this.closeables).close();
        this.scheduledExecutorService.shutdownNow();
        logger.info("Shutdown completed");
    }
}
