/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.IndeterminateOperationStateException;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.function.Functions;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.metrics.JobMetrics;
import com.hazelcast.jet.core.metrics.Measurement;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.RunnableEx;
import com.hazelcast.jet.impl.ExplodeSnapshotP;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobMetricsUtil;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotValidator;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.CancellationByUserException;
import com.hazelcast.jet.impl.exception.ExecutionNotFoundException;
import com.hazelcast.jet.impl.exception.JetDisabledException;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.operation.GetLocalJobMetricsOperation;
import com.hazelcast.jet.impl.operation.InitExecutionOperation;
import com.hazelcast.jet.impl.operation.StartExecutionOperation;
import com.hazelcast.jet.impl.operation.TerminateExecutionOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.version.Version;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MasterJobContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    public static final String SNAPSHOT_VERTEX_PREFIX = "__snapshot_";
    private static final int COLLECT_METRICS_RETRY_DELAY_MILLIS = 100;
    private static final Runnable NO_OP = () -> {};
    private final MasterContext mc;
    private final ILogger logger;
    private final int defaultParallelism;
    private final int defaultQueueSize;
    private volatile long executionStartTime = System.currentTimeMillis();
    private volatile ExecutionFailureCallback executionFailureCallback;
    private volatile Set<Vertex> vertices;
    private volatile boolean verticesCompleted;
    @Nonnull
    private volatile List<RawJobMetrics> jobMetrics = Collections.emptyList();
    @Nonnull
    private volatile CompletableFuture<Void> executionCompletionFuture = CompletableFuture.completedFuture(null);
    private final NonCompletableFuture jobCompletionFuture = new NonCompletableFuture();
    private volatile TerminationRequest terminationRequest;

    MasterJobContext(MasterContext masterContext, ILogger logger2) {
        this.mc = masterContext;
        this.logger = logger2;
        this.defaultParallelism = this.mc.getJetServiceBackend().getJetConfig().getCooperativeThreadCount();
        this.defaultQueueSize = this.mc.getJetServiceBackend().getJetConfig().getDefaultEdgeConfig().getQueueSize();
    }

    public CompletableFuture<Void> jobCompletionFuture() {
        return this.jobCompletionFuture;
    }

    Optional<TerminationRequest> getTerminationRequest() {
        return Optional.ofNullable(this.terminationRequest);
    }

    Optional<TerminationMode> requestedTerminationMode() {
        return this.getTerminationRequest().map(TerminationRequest::getMode);
    }

    boolean isUserInitiatedTermination() {
        return this.getTerminationRequest().map(TerminationRequest::isUserInitiated).orElse(false);
    }

    private Throwable createCancellationException() {
        return this.isUserInitiatedTermination() ? new CancellationByUserException() : new CancellationException();
    }

    private boolean isCancelled() {
        return this.requestedTerminationMode().map(mode -> mode == TerminationMode.CANCEL_FORCEFUL).orElse(false);
    }

    private boolean isCancelledGracefully() {
        return this.requestedTerminationMode().map(mode -> mode == TerminationMode.CANCEL_GRACEFUL).orElse(false);
    }

    void tryStartJob() {
        JobCoordinationService coordinator = this.mc.coordinationService();
        MembersView membersView = Util.getMembersView(this.mc.nodeEngine());
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            try {
                String snapshotMapName;
                String snapshotName;
                this.executionStartTime = System.currentTimeMillis();
                JobExecutionRecord jobExecRec = this.mc.jobExecutionRecord();
                jobExecRec.markExecuted();
                DAG dag = this.resolveDag();
                if (dag == null) {
                    return;
                }
                String dotRepresentation = dag.toDotString(this.defaultParallelism, this.defaultQueueSize);
                long snapshotId = jobExecRec.snapshotId();
                if (snapshotId >= 0L) {
                    snapshotName = jobExecRec.exportedSnapshotName();
                    snapshotMapName = jobExecRec.successfulSnapshotDataMapName();
                } else {
                    snapshotName = this.mc.jobConfig().getInitialSnapshotName();
                    String string = snapshotMapName = snapshotName != null ? JobRepository.exportedSnapshotMapName(snapshotName) : null;
                }
                if (snapshotMapName != null) {
                    this.rewriteDagWithSnapshotRestore(dag, snapshotId, snapshotMapName, snapshotName);
                } else {
                    this.logger.info("Didn't find any snapshot to restore for " + this.mc.jobIdString());
                }
                this.logger.info("Start executing " + this.mc.jobIdString() + ", execution graph in DOT format:\n" + dotRepresentation + "\nHINT: You can use graphviz or http://viz-js.com to visualize the printed graph.");
                this.logger.fine("Building execution plan for " + this.mc.jobIdString());
                ((CompletableFuture)this.createExecutionPlans(dag, membersView).thenCompose(plans -> coordinator.submitToCoordinatorThread(() -> this.initExecution(membersView, (Map<MemberInfo, ExecutionPlan>)plans)))).whenComplete((r, e) -> {
                    if (e != null) {
                        this.finalizeExecution(ExceptionUtil.peel(e));
                    }
                });
            }
            catch (Throwable e2) {
                this.finalizeExecution(e2);
            }
        });
        if (this.mc.hasTimeout()) {
            long remainingTime = this.mc.remainingTime(Clock.currentTimeMillis());
            this.mc.coordinationService().scheduleJobTimeout(this.mc.jobId(), Math.max(1L, remainingTime));
        }
    }

    private CompletableFuture<Map<MemberInfo, ExecutionPlan>> createExecutionPlans(DAG dag, MembersView membersView) {
        return ExecutionPlanBuilder.createExecutionPlans(this.mc.nodeEngine(), membersView.getMembers(), dag, this.mc.jobId(), this.mc.executionId(), this.mc.jobConfig(), this.mc.jobExecutionRecord().ongoingSnapshotId(), false, this.mc.jobRecord().getSubject());
    }

    private void initExecution(MembersView membersView, Map<MemberInfo, ExecutionPlan> executionPlanMap) {
        this.mc.setExecutionPlanMap(executionPlanMap);
        this.logger.fine("Built execution plans for " + this.mc.jobIdString());
        Set<MemberInfo> participants = this.mc.executionPlanMap().keySet();
        Version coordinatorVersion = this.mc.nodeEngine().getLocalMember().getVersion().asVersion();
        Function<ExecutionPlan, Operation> operationCtor = plan -> new InitExecutionOperation(this.mc.jobId(), this.mc.executionId(), membersView.getVersion(), coordinatorVersion, participants, (Data)this.mc.nodeEngine().getSerializationService().toData(plan), false);
        this.mc.invokeOnParticipants(operationCtor, this::onInitStepCompleted, null, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private DAG resolveDag() {
        this.mc.lock();
        try {
            DAG dag;
            Version currentClusterVersion;
            if (this.isCancelled()) {
                this.logger.fine("Skipping init job '" + this.mc.jobName() + "': is already cancelled.");
                throw new CancellationException();
            }
            if (this.mc.jobStatus() != JobStatus.NOT_RUNNING) {
                this.logger.fine("Not starting job '" + this.mc.jobName() + "': status is " + (Object)((Object)this.mc.jobStatus()));
                DAG dAG = null;
                return dAG;
            }
            if (this.mc.jobExecutionRecord().isSuspended()) {
                this.mc.jobExecutionRecord().clearSuspended();
                this.mc.writeJobExecutionRecord(false);
            }
            if (this.scheduleRestartIfQuorumAbsent() || this.scheduleRestartIfClusterIsNotSafe()) {
                DAG dAG = null;
                return dAG;
            }
            Version jobClusterVersion = this.mc.jobRecord().getClusterVersion();
            if (!jobClusterVersion.equals(currentClusterVersion = this.mc.nodeEngine().getClusterService().getClusterVersion())) {
                throw new JetException("Cancelling job " + this.mc.jobName() + ": the cluster was upgraded since the job was submitted. Submitted to version: " + jobClusterVersion + ", current cluster version: " + currentClusterVersion);
            }
            this.mc.setJobStatus(JobStatus.STARTING);
            try {
                this.mc.writeJobExecutionRecordSafe(true);
            }
            catch (IndeterminateOperationStateException e) {
                this.logger.warning("Job " + this.mc.jobName() + " initial update of JobExecutionRecord was indeterminate. Failed to start job. Will retry.");
                throw new JobTerminateRequestedException(TerminationMode.RESTART_FORCEFUL);
            }
            if (this.terminationRequest != null) {
                if (this.terminationRequest.mode.actionAfterTerminate() != TerminationMode.ActionAfterTerminate.RESTART) {
                    throw new JobTerminateRequestedException(this.terminationRequest.mode);
                }
                this.terminationRequest = null;
            }
            ClassLoader classLoader = this.mc.getJetServiceBackend().getJobClassLoaderService().getOrCreateClassLoader(this.mc.jobConfig(), this.mc.jobId(), JobClassLoaderService.JobPhase.COORDINATOR);
            JobClassLoaderService jobClassLoaderService = this.mc.getJetServiceBackend().getJobClassLoaderService();
            try {
                jobClassLoaderService.prepareProcessorClassLoaders(this.mc.jobId());
                dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.mc.nodeEngine().getSerializationService(), classLoader, this.mc.jobRecord().getDag());
            }
            catch (Exception e) {
                throw new JetException("DAG deserialization failed", e);
            }
            finally {
                jobClassLoaderService.clearProcessorClassLoaders();
            }
            this.vertices = new HashSet<Vertex>();
            this.verticesCompleted = false;
            dag.iterator().forEachRemaining(this.vertices::add);
            this.mc.setExecutionId(this.mc.jobRepository().newExecutionId());
            this.mc.snapshotContext().onExecutionStarted();
            this.executionCompletionFuture = new CompletableFuture();
            DAG dAG = dag;
            return dAG;
        }
        finally {
            this.mc.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    Tuple2<CompletableFuture<Void>, String> requestTermination(TerminationMode mode, boolean allowWhileExportingSnapshot, boolean userInitiated) {
        Tuple2<CompletableFuture<Void>, Object> result;
        JobStatus localStatus;
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (this.mc.jobConfig().getProcessingGuarantee() == ProcessingGuarantee.NONE && mode != TerminationMode.CANCEL_GRACEFUL) {
            mode = mode.withoutTerminalSnapshot();
        }
        this.mc.lock();
        try {
            localStatus = this.mc.jobStatus();
            if (localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT && !allowWhileExportingSnapshot) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Cannot cancel when job status is " + (Object)((Object)JobStatus.SUSPENDED_EXPORTING_SNAPSHOT));
                return tuple2;
            }
            if (localStatus == JobStatus.SUSPENDED && mode != TerminationMode.CANCEL_FORCEFUL) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Job is " + (Object)((Object)JobStatus.SUSPENDED));
                return tuple2;
            }
            if (this.terminationRequest != null) {
                String message = this.terminationRequest.mode == TerminationMode.CANCEL_FORCEFUL && mode == TerminationMode.CANCEL_FORCEFUL ? null : "Job is already terminating in mode: " + this.terminationRequest.mode.name();
                Tuple2<CompletableFuture<Void>, Object> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, message);
                return tuple2;
            }
            this.terminationRequest = new TerminationRequest(mode, userInitiated);
            if (localStatus == JobStatus.SUSPENDED || localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                this.mc.setJobStatus(JobStatus.FAILED, mode.actionAfterTerminate().description(), true);
                this.setFinalResult(this.createCancellationException());
            }
            if (mode.isWithTerminalSnapshot()) {
                this.mc.snapshotContext().enqueueSnapshot(null, true, null);
            }
            result = Tuple2.tuple2(this.executionCompletionFuture, null);
        }
        finally {
            this.mc.unlock();
        }
        if (localStatus == JobStatus.SUSPENDED || localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
            try {
                this.mc.coordinationService().completeJob(this.mc, this.createCancellationException(), System.currentTimeMillis(), userInitiated).get();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        } else if (localStatus == JobStatus.RUNNING || localStatus == JobStatus.STARTING) {
            this.handleTermination(mode);
        }
        return result;
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long snapshotId, String mapName, String snapshotName) {
        IMap<Object, Object> snapshotMap = this.mc.nodeEngine().getHazelcastInstance().getMap(mapName);
        long resolvedSnapshotId = SnapshotValidator.validateSnapshot(snapshotId, snapshotMap, this.mc.jobIdString(), snapshotName);
        this.logger.info(String.format("About to restore the state of %s from snapshot %d, mapName = %s", this.mc.jobIdString(), resolvedSnapshotId, mapName));
        ArrayList originalVertices = new ArrayList();
        dag.iterator().forEachRemaining(originalVertices::add);
        HashMap<String, Integer> vertexToOrdinal = new HashMap<String, Integer>();
        Vertex readSnapshotVertex = dag.newVertex("__snapshot_read", SourceProcessors.readMapP(mapName));
        Vertex explodeVertex = dag.newVertex("__snapshot_explode", () -> new ExplodeSnapshotP(vertexToOrdinal, resolvedSnapshotId));
        dag.edge(Edge.between(readSnapshotVertex, explodeVertex).isolated());
        int index = 0;
        for (Vertex userVertex : originalVertices) {
            vertexToOrdinal.put(userVertex.getName(), index);
            int destOrdinal = dag.getInboundEdges(userVertex.getName()).size();
            dag.edge(new SnapshotRestoreEdge(explodeVertex, index, userVertex, destOrdinal));
            ++index;
        }
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.mc.jobExecutionRecord().getQuorumSize();
        if (this.mc.coordinationService().isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': quorum size " + quorumSize + " is not met");
        this.scheduleRestart("Quorum is absent");
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.mc.coordinationService().shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': cluster is not safe");
        this.scheduleRestart("Cluster is not safe");
        return true;
    }

    private void scheduleRestart(String description) {
        this.mc.assertLockHeld();
        JobStatus jobStatus = this.mc.jobStatus();
        if (jobStatus != JobStatus.NOT_RUNNING && jobStatus != JobStatus.STARTING && jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Restart scheduled in an unexpected state: " + (Object)((Object)jobStatus));
        }
        if (jobStatus != JobStatus.NOT_RUNNING) {
            this.mc.setJobStatus(JobStatus.NOT_RUNNING, description, false);
        }
        this.mc.coordinationService().scheduleRestart(this.mc.jobId());
    }

    private void onInitStepCompleted(Collection<Map.Entry<MemberInfo, Object>> responses) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Throwable error = this.getErrorFromResponses("Init", responses);
            JobStatus status = this.mc.jobStatus();
            if (error == null && status == JobStatus.STARTING) {
                this.invokeStartExecution();
            } else {
                this.cancelExecutionInvocations(this.mc.jobId(), this.mc.executionId(), null, () -> this.onStartExecutionComplete(error != null ? error : new IllegalStateException("Cannot execute " + this.mc.jobIdString() + ": status is " + (Object)((Object)status)), Collections.emptyList()));
            }
        });
    }

    private void invokeStartExecution() {
        this.logger.fine("Executing " + this.mc.jobIdString());
        long executionId = this.mc.executionId();
        this.mc.resetStartOperationResponses();
        this.executionFailureCallback = new ExecutionFailureCallback(executionId, this.mc.startOperationResponses());
        this.getTerminationRequest().ifPresent(request -> this.handleTermination(request.getMode()));
        boolean savingMetricsEnabled = this.mc.jobConfig().isStoreMetricsAfterJobCompletion();
        Function<ExecutionPlan, Operation> operationCtor = plan -> new StartExecutionOperation(this.mc.jobId(), executionId, savingMetricsEnabled);
        Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback = responses -> this.onStartExecutionComplete(this.getErrorFromResponses("Execution", (Collection<Map.Entry<MemberInfo, Object>>)responses), (Collection<Map.Entry<MemberInfo, Object>>)responses);
        this.mc.setJobStatus(JobStatus.RUNNING);
        this.mc.snapshotContext().tryBeginSnapshot();
        this.mc.invokeOnParticipants(operationCtor, completionCallback, this.executionFailureCallback, false);
        if (this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
            this.mc.coordinationService().scheduleSnapshot(this.mc, executionId);
        }
    }

    void handleTermination(@Nonnull TerminationMode mode) {
        if (mode.isWithTerminalSnapshot()) {
            this.mc.snapshotContext().tryBeginSnapshot();
        } else if (this.executionFailureCallback != null) {
            this.executionFailureCallback.cancelInvocations(mode);
        }
    }

    void setFinalResult(Throwable failure) {
        if (failure == null) {
            this.jobCompletionFuture.internalComplete();
        } else {
            this.jobCompletionFuture.internalCompleteExceptionally(failure);
        }
    }

    private Throwable getErrorFromResponses(String opName, Collection<Map.Entry<MemberInfo, Object>> responses) {
        if (this.isCancelled()) {
            this.logger.fine(this.mc.jobIdString() + " to be cancelled after " + opName);
            return this.createCancellationException();
        }
        Map<Boolean, List<Map.Entry>> grouped = responses.stream().map(en -> com.hazelcast.jet.Util.entry(((MemberInfo)en.getKey()).getAddress(), en.getValue())).collect(Collectors.partitioningBy(e1 -> e1.getValue() instanceof Throwable));
        int successfulMembersCount = grouped.getOrDefault(false, Collections.emptyList()).size();
        if (successfulMembersCount == this.mc.executionPlanMap().size()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " was successful");
            return null;
        }
        List failures = grouped.getOrDefault(true, Collections.emptyList());
        if (!failures.isEmpty()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " has failures: " + failures);
        }
        if (failures.stream().allMatch(entry -> entry.getValue() instanceof TerminatedWithSnapshotException)) {
            assert (opName.equals("Execution")) : "opName is '" + opName + "', expected 'Execution'";
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " terminated after a terminal snapshot");
            TerminationMode mode = this.requestedTerminationMode().orElseThrow(() -> new AssertionError((Object)"mode is null"));
            assert (mode.isWithTerminalSnapshot()) : "mode=" + (Object)((Object)mode);
            return mode == TerminationMode.CANCEL_GRACEFUL ? this.createCancellationException() : new JobTerminateRequestedException(mode);
        }
        Map<Boolean, List<Map.Entry>> splitFailures = failures.stream().collect(Collectors.partitioningBy(e -> e.getValue() instanceof CancellationException || e.getValue() instanceof TerminatedWithSnapshotException || ExceptionUtil.isTopologyException((Throwable)e.getValue())));
        List topologyFailures = splitFailures.getOrDefault(true, Collections.emptyList());
        List otherFailures = splitFailures.getOrDefault(false, Collections.emptyList());
        if (!otherFailures.isEmpty()) {
            return (Throwable)((Map.Entry)otherFailures.get(0)).getValue();
        }
        return new TopologyChangedException("Causes from members: " + topologyFailures);
    }

    private void logCannotComplete(Throwable error) {
        if (error != null) {
            this.logger.severe("Cannot properly complete failed " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()), error);
        } else {
            this.logger.severe("Cannot properly complete " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()));
        }
    }

    private void onStartExecutionComplete(Throwable error, Collection<Map.Entry<MemberInfo, Object>> responses) {
        JobStatus status = this.mc.jobStatus();
        if (status != JobStatus.STARTING && status != JobStatus.RUNNING) {
            this.logCannotComplete(error);
            error = new IllegalStateException("Job coordination failed");
        }
        this.setJobMetrics(responses.stream().filter(en -> en.getValue() instanceof RawJobMetrics).map(e1 -> (RawJobMetrics)e1.getValue()).collect(Collectors.toList()));
        if (error instanceof JobTerminateRequestedException && ((JobTerminateRequestedException)error).mode().isWithTerminalSnapshot()) {
            Throwable finalError = error;
            this.mc.snapshotContext().terminalSnapshotFuture().whenCompleteAsync((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> this.finalizeExecution(finalError)));
        } else {
            if (error instanceof ExecutionNotFoundException) {
                Throwable notFoundException = error;
                error = this.getTerminationRequest().map(request -> new JobTerminateRequestedException(request.getMode()).initCause(notFoundException)).orElse(notFoundException);
            }
            this.finalizeExecution(error);
        }
    }

    void cancelExecutionInvocations(long jobId, long executionId, TerminationMode mode, Runnable callback) {
        this.mc.nodeEngine().getExecutionService().execute("hz:async", () -> this.mc.invokeOnParticipants(plan -> new TerminateExecutionOperation(jobId, executionId, mode), responses -> {
            if (responses.stream().map(Map.Entry::getValue).filter(value -> !(value instanceof JetDisabledException)).anyMatch(Objects::nonNull)) {
                this.logger.severe(this.mc.jobIdString() + ": some TerminateExecutionOperation invocations failed, execution might remain stuck: " + responses);
            }
            if (callback != null) {
                callback.run();
            }
        }, null, true));
    }

    void finalizeExecution(@Nullable Throwable failure) {
        ((CompletableFuture)this.mc.coordinationService().submitToCoordinatorThread(() -> {
            this.mc.lock();
            JobStatus status = this.mc.jobStatus();
            if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
                this.logIgnoredCompletion(failure, status);
            }
            this.mc.unlock();
        }).thenComposeAsync(r -> this.completeVertices(failure))).thenCompose(ignored -> this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Runnable nonSynchronizedAction;
            try {
                this.mc.lock();
                this.mc.getJetServiceBackend().getJobClassLoaderService().tryRemoveClassloadersForJob(this.mc.jobId(), JobClassLoaderService.JobPhase.COORDINATOR);
                TerminationMode.ActionAfterTerminate terminationModeAction = failure instanceof JobTerminateRequestedException ? ((JobTerminateRequestedException)failure).mode().actionAfterTerminate() : null;
                this.mc.snapshotContext().onExecutionTerminated();
                String description = this.requestedTerminationMode().map(mode -> mode.actionAfterTerminate().description()).orElse(failure != null ? failure.toString() : null);
                boolean userRequested = this.isUserInitiatedTermination();
                if (terminationModeAction == TerminationMode.ActionAfterTerminate.RESTART) {
                    this.mc.setJobStatus(JobStatus.NOT_RUNNING, description, userRequested);
                    nonSynchronizedAction = () -> this.mc.coordinationService().restartJob(this.mc.jobId());
                } else if (!this.isCancelled() && ExceptionUtil.isRestartableException(failure) && this.mc.jobConfig().isAutoScaling()) {
                    this.scheduleRestart(description);
                    nonSynchronizedAction = NO_OP;
                } else if (terminationModeAction == TerminationMode.ActionAfterTerminate.SUSPEND || ExceptionUtil.isRestartableException(failure) && !this.isCancelled() && !this.mc.jobConfig().isAutoScaling() && this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
                    this.mc.setJobStatus(JobStatus.SUSPENDED, description, userRequested);
                    this.mc.jobExecutionRecord().setSuspended(null);
                    nonSynchronizedAction = () -> this.mc.writeJobExecutionRecord(false);
                } else if (failure != null && !this.isCancelled() && !this.isCancelledGracefully() && this.mc.jobConfig().isSuspendOnFailure()) {
                    this.mc.setJobStatus(JobStatus.SUSPENDED, description, userRequested);
                    this.mc.jobExecutionRecord().setSuspended("Execution failure:\n" + ExceptionUtil.stackTraceToString(failure));
                    nonSynchronizedAction = () -> this.mc.writeJobExecutionRecord(false);
                } else {
                    long completionTime = System.currentTimeMillis();
                    boolean isSuccess = this.logExecutionSummary(failure, completionTime);
                    this.mc.setJobStatus(isSuccess ? JobStatus.COMPLETED : JobStatus.FAILED, description, userRequested);
                    if (failure instanceof LocalMemberResetException) {
                        this.logger.fine("Cancelling job " + this.mc.jobIdString() + " locally: member (local or remote) reset. We don't delete job metadata: job will restart on majority cluster");
                        this.setFinalResult(new CancellationException());
                    } else {
                        this.mc.coordinationService().completeJob(this.mc, failure, completionTime, this.isUserInitiatedTermination()).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, f) -> {
                            if (f != null) {
                                this.logger.warning("Completion of " + this.mc.jobIdString() + " failed", (Throwable)f);
                            } else {
                                this.setFinalResult(failure);
                            }
                        }));
                    }
                    nonSynchronizedAction = NO_OP;
                }
                this.terminationRequest = null;
                this.executionFailureCallback = null;
            }
            finally {
                this.mc.unlock();
            }
            this.executionCompletionFuture.complete(null);
            nonSynchronizedAction.run();
        }));
    }

    private boolean logExecutionSummary(@Nullable Throwable failure, long completionTime) {
        if (failure == null) {
            this.logger.info(this.formatExecutionSummary("completed successfully", completionTime));
            return true;
        }
        if (failure instanceof CancellationException || failure instanceof JobTerminateRequestedException) {
            this.logger.info(this.formatExecutionSummary("got terminated, reason=" + failure, completionTime));
            return false;
        }
        if (failure instanceof JetDisabledException) {
            this.logger.severe(this.formatExecutionSummary("failed. This is probably because the Jet engine is not enabled on all cluster members. Please enable the Jet engine for ALL members in the cluster.", completionTime), failure);
            return false;
        }
        this.logger.severe(this.formatExecutionSummary("failed", completionTime), failure);
        return false;
    }

    private String formatExecutionSummary(String conclusion, long completionTime) {
        StringBuilder sb = new StringBuilder();
        sb.append("Execution of ").append(this.mc.jobIdString()).append(' ').append(conclusion);
        sb.append("\n\t").append("Start time: ").append(Util.toLocalDateTime(this.executionStartTime));
        sb.append("\n\t").append("Duration: ").append(Util.formatJobDuration(completionTime - this.executionStartTime));
        if (this.jobMetrics.stream().noneMatch(rjm -> rjm.getBlob() != null)) {
            sb.append("\n\tTo see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion");
        } else {
            JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(this.jobMetrics);
            Map<String, Long> receivedCounts = MasterJobContext.mergeByVertex(jobMetrics.get("receivedCount"));
            Map<String, Long> emittedCounts = MasterJobContext.mergeByVertex(jobMetrics.get("emittedCount"));
            Map<String, Long> distributedBytesIn = MasterJobContext.mergeByVertex(jobMetrics.get("distributedBytesIn"));
            Map<String, Long> distributedBytesOut = MasterJobContext.mergeByVertex(jobMetrics.get("distributedBytesOut"));
            sb.append("\n\tVertices:");
            for (Vertex vertex : this.vertices) {
                sb.append("\n\t\t").append(vertex.getName());
                sb.append(MasterJobContext.getValueForVertex("\n\t\t\treceivedCount", vertex, receivedCounts));
                sb.append(MasterJobContext.getValueForVertex("\n\t\t\temittedCount", vertex, emittedCounts));
                sb.append(MasterJobContext.getValueForVertex("\n\t\t\tdistributedBytesIn", vertex, distributedBytesIn));
                sb.append(MasterJobContext.getValueForVertex("\n\t\t\tdistributedBytesOut", vertex, distributedBytesOut));
            }
        }
        return sb.toString();
    }

    private static Map<String, Long> mergeByVertex(List<Measurement> measurements) {
        return measurements.stream().collect(Collectors.toMap(m4 -> m4.tag("vertex"), Measurement::value, Long::sum));
    }

    private static String getValueForVertex(String label, Vertex vertex, Map<String, Long> values2) {
        Long value = values2.get(vertex.getName());
        return value == null ? "" : label + ": " + value;
    }

    private void logIgnoredCompletion(@Nullable Throwable failure, JobStatus status) {
        if (failure != null) {
            this.logger.severe("Ignoring failure completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status), failure);
        } else {
            this.logger.severe("Ignoring completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status));
        }
    }

    private CompletableFuture<Void> completeVertices(@Nullable Throwable failure) {
        if (this.vertices != null && !this.verticesCompleted) {
            ManagedExecutorService offloadExecutor = this.mc.nodeEngine().getExecutionService().getExecutor("hz:jet-job-offloadable");
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(this.vertices.size());
            JobClassLoaderService classLoaderService = this.mc.getJetServiceBackend().getJobClassLoaderService();
            for (Vertex v2 : this.vertices) {
                ProcessorMetaSupplier processorMetaSupplier = v2.getMetaSupplier();
                RunnableEx closeAction = () -> {
                    try {
                        ClassLoader processorCl = classLoaderService.getProcessorClassLoader(this.mc.jobId(), v2.getName());
                        Util.doWithClassLoader(processorCl, () -> processorMetaSupplier.close(failure));
                    }
                    catch (Throwable e) {
                        this.logger.severe(this.mc.jobIdString() + " encountered an exception in ProcessorMetaSupplier.close(), ignoring it", e);
                    }
                };
                Executor executor = processorMetaSupplier.closeIsCooperative() ? ConcurrencyUtil.CALLER_RUNS : offloadExecutor;
                futures.add(CompletableFuture.runAsync(closeAction, executor));
            }
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
                this.verticesCompleted = true;
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    void resumeJob() {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.SUSPENDED) {
                this.logger.info("Not resuming " + this.mc.jobIdString() + ": not " + (Object)((Object)JobStatus.SUSPENDED) + ", but " + (Object)((Object)this.mc.jobStatus()));
                return;
            }
            this.mc.setJobStatus(JobStatus.NOT_RUNNING, "Resume", true);
        }
        finally {
            this.mc.unlock();
        }
        this.logger.fine("Resuming job " + this.mc.jobName());
        this.tryStartJob();
    }

    private boolean hasParticipant(UUID uuid) {
        Map<MemberInfo, ExecutionPlan> planMap = this.mc.executionPlanMap();
        return this.mc.nodeEngine().getLocalMember().getUuid().equals(uuid) || planMap != null && planMap.keySet().stream().anyMatch(mi -> mi.getUuid().equals(uuid));
    }

    @Nonnull
    CompletableFuture<Void> onParticipantGracefulShutdown(UUID uuid) {
        return this.hasParticipant(uuid) ? this.gracefullyTerminate() : CompletableFuture.completedFuture(null);
    }

    @Nonnull
    CompletableFuture<Void> gracefullyTerminate() {
        CompletableFuture<CompletableFuture> future = this.mc.coordinationService().submitToCoordinatorThread(() -> this.requestTermination(TerminationMode.RESTART_GRACEFUL, false, false).f0());
        return future.thenCompose(Function.identity());
    }

    boolean maybeScaleUp(int dataMembersWithPartitionsCount) {
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (!this.mc.jobConfig().isAutoScaling()) {
            return true;
        }
        if (this.mc.executionPlanMap() == null || this.mc.executionPlanMap().size() == dataMembersWithPartitionsCount) {
            LoggingUtil.logFine(this.logger, "Not scaling up %s: not running or already running on all members", this.mc.jobIdString());
            return true;
        }
        JobStatus localStatus = this.mc.jobStatus();
        if (localStatus == JobStatus.RUNNING && this.requestTermination(TerminationMode.RESTART_GRACEFUL, false, false).f1() == null) {
            this.logger.info("Requested restart of " + this.mc.jobIdString() + " to make use of added member(s). Job was running on " + this.mc.executionPlanMap().size() + " members, cluster now has " + dataMembersWithPartitionsCount + " data members with assigned partitions");
            return true;
        }
        return false;
    }

    List<RawJobMetrics> jobMetrics() {
        return this.jobMetrics;
    }

    private void setJobMetrics(List<RawJobMetrics> jobMetrics) {
        assert (jobMetrics.stream().allMatch(Objects::nonNull)) : "responses=" + jobMetrics;
        this.jobMetrics = Objects.requireNonNull(jobMetrics);
    }

    void collectMetrics(CompletableFuture<List<RawJobMetrics>> clientFuture) {
        if (this.mc.jobStatus() == JobStatus.RUNNING) {
            long jobId = this.mc.jobId();
            long executionId = this.mc.executionId();
            this.mc.invokeOnParticipants(plan -> new GetLocalJobMetricsOperation(jobId, executionId), objects -> this.completeWithMetrics(clientFuture, (Collection<Map.Entry<MemberInfo, Object>>)objects), null, false);
        } else {
            clientFuture.complete(this.jobMetrics);
        }
    }

    private void completeWithMetrics(CompletableFuture<List<RawJobMetrics>> clientFuture, Collection<Map.Entry<MemberInfo, Object>> metrics) {
        if (metrics.stream().anyMatch(en -> en.getValue() instanceof ExecutionNotFoundException)) {
            LoggingUtil.logFinest(this.logger, "Rescheduling collectMetrics for %s, some members threw %s", this.mc.jobIdString(), ExecutionNotFoundException.class.getSimpleName());
            this.mc.nodeEngine().getExecutionService().schedule(() -> this.collectMetrics(clientFuture), 100L, TimeUnit.MILLISECONDS);
            return;
        }
        Throwable firstThrowable = metrics.stream().map(Map.Entry::getValue).filter(Throwable.class::isInstance).findFirst().orElse(null);
        if (firstThrowable != null) {
            clientFuture.completeExceptionally(firstThrowable);
        } else {
            clientFuture.complete(Util.toList(metrics, e -> (RawJobMetrics)e.getValue()));
        }
    }

    private class ExecutionFailureCallback
    implements BiConsumer<Address, Object> {
        private final AtomicBoolean invocationsCancelled = new AtomicBoolean();
        private final long executionId;
        private final Map<Address, CompletableFuture<Void>> startOperationResponses;

        ExecutionFailureCallback(long executionId, Map<Address, CompletableFuture<Void>> startOperationResponses) {
            this.executionId = executionId;
            this.startOperationResponses = startOperationResponses;
        }

        @Override
        public void accept(Address address, Object response) {
            LoggingUtil.logFine(MasterJobContext.this.logger, "%s received response to StartExecutionOperation from %s: %s", MasterJobContext.this.mc.jobIdString(), address, response);
            CompletableFuture<Void> future = this.startOperationResponses.get(address);
            if (response instanceof Throwable) {
                Throwable throwable = (Throwable)response;
                future.completeExceptionally(throwable);
                if (!(ExceptionUtil.peel(throwable) instanceof TerminatedWithSnapshotException)) {
                    this.cancelInvocations(null);
                }
            } else {
                future.complete(null);
            }
        }

        void cancelInvocations(TerminationMode mode) {
            if (this.invocationsCancelled.compareAndSet(false, true)) {
                MasterJobContext.this.cancelExecutionInvocations(MasterJobContext.this.mc.jobId(), this.executionId, mode, null);
            }
        }
    }

    public static class TerminationRequest {
        private final TerminationMode mode;
        private final boolean userInitiated;

        public TerminationRequest(@Nonnull TerminationMode mode, boolean userInitiated) {
            this.mode = mode;
            this.userInitiated = userInitiated;
        }

        TerminationMode getMode() {
            return this.mode;
        }

        boolean isUserInitiated() {
            return this.userInitiated;
        }
    }

    public static class SnapshotRestoreEdge
    extends Edge {
        SnapshotRestoreEdge(Vertex source2, int sourceOrdinal, Vertex destination, int destOrdinal) {
            super(source2, sourceOrdinal, destination, destOrdinal);
            this.distributed();
            this.partitioned(Functions.entryKey());
        }

        @Override
        public int getPriority() {
            return Integer.MIN_VALUE;
        }
    }
}

