package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.exchange.DirectExchangeInput;
import io.trino.execution.BasicStageStats;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.QueryState;
import io.trino.execution.QueryStateMachine;
import io.trino.execution.RemoteTask;
import io.trino.execution.RemoteTaskFactory;
import io.trino.execution.SqlStage;
import io.trino.execution.SqlTaskManager;
import io.trino.execution.StageId;
import io.trino.execution.StageInfo;
import io.trino.execution.StateMachine;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.TaskFailureListener;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.scheduler.policy.ExecutionSchedule;
import io.trino.execution.scheduler.policy.StagesScheduleResult;
import io.trino.failuredetector.FailureDetector;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Metadata;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorType;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.split.SplitSource;
import io.trino.sql.planner.NodePartitionMap;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.SplitSourceFactory;
import io.trino.sql.planner.SubPlan;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.util.Failures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler.class */
public class PipelinedQueryScheduler implements QueryScheduler {
    private static final Logger log = Logger.get(PipelinedQueryScheduler.class);
    private final QueryStateMachine queryStateMachine;
    private final NodePartitioningManager nodePartitioningManager;
    private final NodeScheduler nodeScheduler;
    private final int splitBatchSize;
    private final ExecutorService executor;
    private final ScheduledExecutorService schedulerExecutor;
    private final FailureDetector failureDetector;
    private final ExecutionPolicy executionPolicy;
    private final SplitSchedulerStats schedulerStats;
    private final DynamicFilterService dynamicFilterService;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final SplitSourceFactory splitSourceFactory;
    private final StageManager stageManager;
    private final CoordinatorStagesScheduler coordinatorStagesScheduler;
    private final RetryPolicy retryPolicy;
    private final int maxQueryRetryAttempts;
    private final Duration retryInitialDelay;
    private final Duration retryMaxDelay;
    private final double retryDelayScaleFactor;

    @GuardedBy("this")
    private boolean started;

    @GuardedBy("this")
    private Future<Void> distributedStagesSchedulingTask;
    private final AtomicInteger currentAttempt = new AtomicInteger();

    @GuardedBy("this")
    private final AtomicReference<DistributedStagesScheduler> distributedStagesScheduler = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$CoordinatorStagesScheduler.class */
    public static class CoordinatorStagesScheduler {
        private static final int[] SINGLE_PARTITION = {0};
        private final QueryStateMachine queryStateMachine;
        private final NodeScheduler nodeScheduler;
        private final Map<PlanFragmentId, PipelinedOutputBufferManager> outputBuffersForStagesConsumedByCoordinator;
        private final Map<PlanFragmentId, Optional<int[]>> bucketToPartitionForStagesConsumedByCoordinator;
        private final TaskLifecycleListener taskLifecycleListener;
        private final StageManager stageManager;
        private final List<StageExecution> stageExecutions;
        private final AtomicReference<DistributedStagesScheduler> distributedStagesScheduler;
        private final SqlTaskManager coordinatorTaskManager;
        private final AtomicBoolean scheduled = new AtomicBoolean();

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v22, types: [io.trino.execution.scheduler.TaskLifecycleListener] */
        public static CoordinatorStagesScheduler create(QueryStateMachine queryStateMachine, NodeScheduler nodeScheduler, StageManager stageManager, FailureDetector failureDetector, Executor executor, AtomicReference<DistributedStagesScheduler> atomicReference, SqlTaskManager sqlTaskManager) {
            Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBuffersForStagesConsumedByCoordinator = createOutputBuffersForStagesConsumedByCoordinator(stageManager);
            Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionForStagesConsumedByCoordinator = createBucketToPartitionForStagesConsumedByCoordinator(stageManager);
            QueryOutputTaskLifecycleListener queryOutputTaskLifecycleListener = new QueryOutputTaskLifecycleListener(queryStateMachine);
            ImmutableList.Builder builder = ImmutableList.builder();
            for (SqlStage sqlStage : stageManager.getCoordinatorStagesInTopologicalOrder()) {
                PipelinedStageExecution createPipelinedStageExecution = PipelinedStageExecution.createPipelinedStageExecution(sqlStage, createOutputBuffersForStagesConsumedByCoordinator, queryOutputTaskLifecycleListener, failureDetector, executor, createBucketToPartitionForStagesConsumedByCoordinator.get(sqlStage.getFragment().getId()), 0);
                builder.add(createPipelinedStageExecution);
                queryOutputTaskLifecycleListener = createPipelinedStageExecution.getTaskLifecycleListener();
            }
            CoordinatorStagesScheduler coordinatorStagesScheduler = new CoordinatorStagesScheduler(queryStateMachine, nodeScheduler, createOutputBuffersForStagesConsumedByCoordinator, createBucketToPartitionForStagesConsumedByCoordinator, queryOutputTaskLifecycleListener, stageManager, builder.build(), atomicReference, sqlTaskManager);
            coordinatorStagesScheduler.initialize();
            return coordinatorStagesScheduler;
        }

        private static Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBuffersForStagesConsumedByCoordinator(StageManager stageManager) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            SqlStage outputStage = stageManager.getOutputStage();
            builder.put(outputStage.getFragment().getId(), createSingleStreamOutputBuffer(outputStage));
            Iterator<SqlStage> it = stageManager.getCoordinatorStagesInTopologicalOrder().iterator();
            while (it.hasNext()) {
                for (SqlStage sqlStage : stageManager.getChildren(it.next().getStageId())) {
                    builder.put(sqlStage.getFragment().getId(), createSingleStreamOutputBuffer(sqlStage));
                }
            }
            return builder.buildOrThrow();
        }

        private static PipelinedOutputBufferManager createSingleStreamOutputBuffer(SqlStage sqlStage) {
            PartitioningHandle handle = sqlStage.getFragment().getOutputPartitioningScheme().getPartitioning().getHandle();
            Preconditions.checkArgument(handle.isSingleNode(), "partitioning is expected to be single node: " + handle);
            return new PartitionedPipelinedOutputBufferManager(handle, 1);
        }

        private static Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionForStagesConsumedByCoordinator(StageManager stageManager) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            builder.put(stageManager.getOutputStage().getFragment().getId(), Optional.of(SINGLE_PARTITION));
            Iterator<SqlStage> it = stageManager.getCoordinatorStagesInTopologicalOrder().iterator();
            while (it.hasNext()) {
                Iterator<SqlStage> it2 = stageManager.getChildren(it.next().getStageId()).iterator();
                while (it2.hasNext()) {
                    builder.put(it2.next().getFragment().getId(), Optional.of(SINGLE_PARTITION));
                }
            }
            return builder.buildOrThrow();
        }

        private CoordinatorStagesScheduler(QueryStateMachine queryStateMachine, NodeScheduler nodeScheduler, Map<PlanFragmentId, PipelinedOutputBufferManager> map, Map<PlanFragmentId, Optional<int[]>> map2, TaskLifecycleListener taskLifecycleListener, StageManager stageManager, List<StageExecution> list, AtomicReference<DistributedStagesScheduler> atomicReference, SqlTaskManager sqlTaskManager) {
            this.queryStateMachine = (QueryStateMachine) Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
            this.nodeScheduler = (NodeScheduler) Objects.requireNonNull(nodeScheduler, "nodeScheduler is null");
            this.outputBuffersForStagesConsumedByCoordinator = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "outputBuffersForStagesConsumedByCoordinator is null"));
            this.bucketToPartitionForStagesConsumedByCoordinator = ImmutableMap.copyOf((Map) Objects.requireNonNull(map2, "bucketToPartitionForStagesConsumedByCoordinator is null"));
            this.taskLifecycleListener = (TaskLifecycleListener) Objects.requireNonNull(taskLifecycleListener, "taskLifecycleListener is null");
            this.stageManager = (StageManager) Objects.requireNonNull(stageManager, "stageManager is null");
            this.stageExecutions = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "stageExecutions is null"));
            this.distributedStagesScheduler = (AtomicReference) Objects.requireNonNull(atomicReference, "distributedStagesScheduler is null");
            this.coordinatorTaskManager = (SqlTaskManager) Objects.requireNonNull(sqlTaskManager, "coordinatorTaskManager is null");
        }

        private void initialize() {
            for (StageExecution stageExecution : this.stageExecutions) {
                stageExecution.addStateChangeListener(state -> {
                    if (this.queryStateMachine.isDone()) {
                        return;
                    }
                    if (state == StageExecution.State.FAILED) {
                        RuntimeException runtimeException = (RuntimeException) stageExecution.getFailureCause().map((v0) -> {
                            return v0.toException();
                        }).orElseGet(() -> {
                            return new VerifyException(String.format("stage execution for stage %s is failed by failure cause is not present", stageExecution.getStageId()));
                        });
                        this.stageManager.get(stageExecution.getStageId()).fail(runtimeException);
                        this.queryStateMachine.transitionToFailed(runtimeException);
                    } else if (state == StageExecution.State.ABORTED) {
                        this.stageManager.get(stageExecution.getStageId()).abort();
                        this.queryStateMachine.transitionToFailed(new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Query stage was aborted"));
                    } else if (state.isDone()) {
                        this.stageManager.get(stageExecution.getStageId()).finish();
                    }
                });
            }
            int i = 0;
            for (int i2 = 1; i2 < this.stageExecutions.size(); i2++) {
                StageExecution stageExecution2 = this.stageExecutions.get(i);
                StageExecution stageExecution3 = this.stageExecutions.get(i2);
                Set<SqlStage> children = this.stageManager.getChildren(stageExecution2.getStageId());
                Verify.verify(children.size() == 1, "exactly one child stage is expected", new Object[0]);
                Verify.verify(((SqlStage) Iterables.getOnlyElement(children)).getStageId().equals(stageExecution3.getStageId()), "stage execution order doesn't match the stage order", new Object[0]);
                stageExecution2.addStateChangeListener(state2 -> {
                    if (state2 == StageExecution.State.FLUSHING || state2.isDone()) {
                        stageExecution3.cancel();
                    }
                });
                i++;
            }
            Optional.ofNullable((StageExecution) Iterables.getFirst(this.stageExecutions, (Object) null)).ifPresent(stageExecution4 -> {
                stageExecution4.addStateChangeListener(state3 -> {
                    if (state3 == StageExecution.State.FINISHED) {
                        this.queryStateMachine.transitionToFinishing();
                    } else if (state3 == StageExecution.State.CANCELED) {
                        this.queryStateMachine.transitionToCanceled();
                    }
                });
            });
            Optional.ofNullable((StageExecution) Iterables.getLast(this.stageExecutions, (Object) null)).ifPresent(stageExecution5 -> {
                stageExecution5.addStateChangeListener(state3 -> {
                    DistributedStagesScheduler distributedStagesScheduler;
                    if ((state3 == StageExecution.State.FLUSHING || state3.isDone()) && (distributedStagesScheduler = this.distributedStagesScheduler.get()) != null) {
                        distributedStagesScheduler.cancel();
                    }
                });
            });
        }

        public synchronized void schedule() {
            if (this.scheduled.compareAndSet(false, true)) {
                TaskFailureReporter taskFailureReporter = new TaskFailureReporter(this.distributedStagesScheduler);
                this.queryStateMachine.addOutputTaskFailureListener(taskFailureReporter);
                InternalNode selectCurrentNode = this.nodeScheduler.createNodeSelector(this.queryStateMachine.getSession(), Optional.empty()).selectCurrentNode();
                for (StageExecution stageExecution : this.stageExecutions) {
                    Optional<RemoteTask> scheduleTask = stageExecution.scheduleTask(selectCurrentNode, 0, ImmutableMultimap.of());
                    stageExecution.schedulingComplete();
                    scheduleTask.ifPresent(remoteTask -> {
                        this.coordinatorTaskManager.addSourceTaskFailureListener(remoteTask.getTaskId(), taskFailureReporter);
                    });
                    if (this.queryStateMachine.getQueryState() == QueryState.STARTING && scheduleTask.isPresent()) {
                        this.queryStateMachine.transitionToRunning();
                    }
                }
            }
        }

        public Map<PlanFragmentId, PipelinedOutputBufferManager> getOutputBuffersForStagesConsumedByCoordinator() {
            return this.outputBuffersForStagesConsumedByCoordinator;
        }

        public Map<PlanFragmentId, Optional<int[]>> getBucketToPartitionForStagesConsumedByCoordinator() {
            return this.bucketToPartitionForStagesConsumedByCoordinator;
        }

        public TaskLifecycleListener getTaskLifecycleListener() {
            return this.taskLifecycleListener;
        }

        public void cancelStage(StageId stageId) {
            for (StageExecution stageExecution : this.stageExecutions) {
                if (stageExecution.getStageId().equals(stageId)) {
                    stageExecution.cancel();
                }
            }
        }

        public void cancel() {
            this.stageExecutions.forEach((v0) -> {
                v0.cancel();
            });
        }

        public void abort() {
            this.stageExecutions.forEach((v0) -> {
                v0.abort();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$DistributedStagesScheduler.class */
    public static class DistributedStagesScheduler {
        private final DistributedStagesSchedulerStateMachine stateMachine;
        private final QueryStateMachine queryStateMachine;
        private final SplitSchedulerStats schedulerStats;
        private final StageManager stageManager;
        private final ExecutionSchedule executionSchedule;
        private final Map<StageId, StageScheduler> stageSchedulers;
        private final Map<StageId, StageExecution> stageExecutions;
        private final DynamicFilterService dynamicFilterService;
        private final AtomicBoolean started = new AtomicBoolean();

        public static DistributedStagesScheduler create(QueryStateMachine queryStateMachine, SplitSchedulerStats splitSchedulerStats, NodeScheduler nodeScheduler, NodePartitioningManager nodePartitioningManager, StageManager stageManager, CoordinatorStagesScheduler coordinatorStagesScheduler, ExecutionPolicy executionPolicy, FailureDetector failureDetector, ScheduledExecutorService scheduledExecutorService, SplitSourceFactory splitSourceFactory, int i, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, RetryPolicy retryPolicy, int i2) {
            TaskLifecycleListener taskLifecycleListener;
            DistributedStagesSchedulerStateMachine distributedStagesSchedulerStateMachine = new DistributedStagesSchedulerStateMachine(queryStateMachine.getQueryId(), scheduledExecutorService);
            HashMap hashMap = new HashMap();
            BiFunction biFunction = (partitioningHandle, optional) -> {
                return (NodePartitionMap) hashMap.computeIfAbsent(partitioningHandle, partitioningHandle -> {
                    return nodePartitioningManager.getNodePartitioningMap(queryStateMachine.getSession(), partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION) ? SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION : partitioningHandle, optional);
                });
            };
            Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionMap = createBucketToPartitionMap(coordinatorStagesScheduler.getBucketToPartitionForStagesConsumedByCoordinator(), stageManager, biFunction);
            Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBufferManagers = createOutputBufferManagers(coordinatorStagesScheduler.getOutputBuffersForStagesConsumedByCoordinator(), stageManager, createBucketToPartitionMap);
            TaskLifecycleListener taskLifecycleListener2 = coordinatorStagesScheduler.getTaskLifecycleListener();
            if (retryPolicy != RetryPolicy.NONE) {
                TaskLifecycleListenerBridge taskLifecycleListenerBridge = new TaskLifecycleListenerBridge(taskLifecycleListener2);
                taskLifecycleListener2 = taskLifecycleListenerBridge;
                distributedStagesSchedulerStateMachine.addStateChangeListener(distributedStagesSchedulerState -> {
                    if (distributedStagesSchedulerState == DistributedStagesSchedulerState.FINISHED) {
                        taskLifecycleListenerBridge.notifyNoMoreSourceTasks();
                    }
                });
            }
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (SqlStage sqlStage : stageManager.getDistributedStagesInTopologicalOrder()) {
                Optional<SqlStage> parent = stageManager.getParent(sqlStage.getStageId());
                if (parent.isEmpty() || parent.get().getFragment().getPartitioning().isCoordinatorOnly()) {
                    taskLifecycleListener = taskLifecycleListener2;
                } else {
                    StageId stageId = parent.get().getStageId();
                    taskLifecycleListener = ((StageExecution) Objects.requireNonNull((StageExecution) linkedHashMap.get(stageId), (Supplier<String>) () -> {
                        return "execution is null for stage: " + stageId;
                    })).getTaskLifecycleListener();
                }
                TaskLifecycleListener taskLifecycleListener3 = taskLifecycleListener;
                PlanFragment fragment = sqlStage.getFragment();
                linkedHashMap.put(sqlStage.getStageId(), PipelinedStageExecution.createPipelinedStageExecution(stageManager.get(fragment.getId()), createOutputBufferManagers, taskLifecycleListener3, failureDetector, scheduledExecutorService, createBucketToPartitionMap.get(fragment.getId()), i2));
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (StageExecution stageExecution : linkedHashMap.values()) {
                builder.put(stageExecution.getStageId(), createStageScheduler(queryStateMachine, stageExecution, splitSourceFactory, (List) stageManager.getChildren(stageExecution.getStageId()).stream().map(sqlStage2 -> {
                    return (StageExecution) Objects.requireNonNull((StageExecution) linkedHashMap.get(sqlStage2.getStageId()), (Supplier<String>) () -> {
                        return "stage execution not found for stage: " + sqlStage2;
                    });
                }).collect(ImmutableList.toImmutableList()), biFunction, nodeScheduler, nodePartitioningManager, i, dynamicFilterService, scheduledExecutorService, tableExecuteContextManager));
            }
            DistributedStagesScheduler distributedStagesScheduler = new DistributedStagesScheduler(distributedStagesSchedulerStateMachine, queryStateMachine, splitSchedulerStats, stageManager, executionPolicy.createExecutionSchedule(linkedHashMap.values()), builder.buildOrThrow(), ImmutableMap.copyOf(linkedHashMap), dynamicFilterService);
            distributedStagesScheduler.initialize();
            return distributedStagesScheduler;
        }

        private static Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionMap(Map<PlanFragmentId, Optional<int[]>> map, StageManager stageManager, BiFunction<PartitioningHandle, Optional<Integer>, NodePartitionMap> biFunction) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            builder.putAll(map);
            for (SqlStage sqlStage : stageManager.getDistributedStagesInTopologicalOrder()) {
                PlanFragment fragment = sqlStage.getFragment();
                Optional<int[]> bucketToPartition = getBucketToPartition(fragment.getPartitioning(), biFunction, fragment.getRoot(), fragment.getRemoteSourceNodes(), fragment.getPartitionCount());
                Iterator<SqlStage> it = stageManager.getChildren(sqlStage.getStageId()).iterator();
                while (it.hasNext()) {
                    builder.put(it.next().getFragment().getId(), bucketToPartition);
                }
            }
            return builder.buildOrThrow();
        }

        private static Optional<int[]> getBucketToPartition(PartitioningHandle partitioningHandle, BiFunction<PartitioningHandle, Optional<Integer>, NodePartitionMap> biFunction, PlanNode planNode, List<RemoteSourceNode> list, Optional<Integer> optional) {
            if (partitioningHandle.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION) || partitioningHandle.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
                return Optional.of(new int[1]);
            }
            if (PlanNodeSearcher.searchFrom(planNode).where(planNode2 -> {
                return planNode2 instanceof TableScanNode;
            }).findFirst().isPresent()) {
                return list.stream().allMatch(remoteSourceNode -> {
                    return remoteSourceNode.getExchangeType() == ExchangeNode.Type.REPLICATE;
                }) ? Optional.empty() : Optional.of(biFunction.apply(partitioningHandle, optional).getBucketToPartition());
            }
            NodePartitionMap apply = biFunction.apply(partitioningHandle, optional);
            Failures.checkCondition(!apply.getPartitionToNode().isEmpty(), StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
            return Optional.of(apply.getBucketToPartition());
        }

        private static Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBufferManagers(Map<PlanFragmentId, PipelinedOutputBufferManager> map, StageManager stageManager, Map<PlanFragmentId, Optional<int[]>> map2) {
            Object partitionedPipelinedOutputBufferManager;
            ImmutableMap.Builder builder = ImmutableMap.builder();
            builder.putAll(map);
            Iterator<SqlStage> it = stageManager.getDistributedStagesInTopologicalOrder().iterator();
            while (it.hasNext()) {
                for (SqlStage sqlStage : stageManager.getChildren(it.next().getStageId())) {
                    PlanFragmentId id = sqlStage.getFragment().getId();
                    PartitioningHandle handle = sqlStage.getFragment().getOutputPartitioningScheme().getPartitioning().getHandle();
                    if (handle.equals(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION)) {
                        partitionedPipelinedOutputBufferManager = new BroadcastPipelinedOutputBufferManager();
                    } else if (handle.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
                        partitionedPipelinedOutputBufferManager = new ScaledPipelinedOutputBufferManager();
                    } else {
                        Optional<int[]> optional = map2.get(id);
                        Preconditions.checkArgument(optional.isPresent(), "bucketToPartition is expected to be present for fragment: %s", id);
                        partitionedPipelinedOutputBufferManager = new PartitionedPipelinedOutputBufferManager(handle, Ints.max(optional.get()) + 1);
                    }
                    builder.put(id, partitionedPipelinedOutputBufferManager);
                }
            }
            return builder.buildOrThrow();
        }

        private static StageScheduler createStageScheduler(QueryStateMachine queryStateMachine, StageExecution stageExecution, SplitSourceFactory splitSourceFactory, List<StageExecution> list, BiFunction<PartitioningHandle, Optional<Integer>, NodePartitionMap> biFunction, NodeScheduler nodeScheduler, NodePartitioningManager nodePartitioningManager, int i, DynamicFilterService dynamicFilterService, ScheduledExecutorService scheduledExecutorService, TableExecuteContextManager tableExecuteContextManager) {
            List<InternalNode> partitionToNode;
            BucketNodeMap asBucketNodeMap;
            Session session = queryStateMachine.getSession();
            PlanFragment fragment = stageExecution.getFragment();
            PartitioningHandle partitioning = fragment.getPartitioning();
            Optional<Integer> partitionCount = fragment.getPartitionCount();
            final Map<PlanNodeId, SplitSource> createSplitSources = splitSourceFactory.createSplitSources(session, fragment);
            if (!createSplitSources.isEmpty()) {
                queryStateMachine.addStateChangeListener(new StateMachine.StateChangeListener<QueryState>() { // from class: io.trino.execution.scheduler.PipelinedQueryScheduler.DistributedStagesScheduler.1
                    private final AtomicReference<Collection<SplitSource>> splitSourcesReference;

                    {
                        this.splitSourcesReference = new AtomicReference<>(createSplitSources.values());
                    }

                    @Override // io.trino.execution.StateMachine.StateChangeListener
                    public void stateChanged(QueryState queryState) {
                        Collection<SplitSource> andSet;
                        if (!queryState.isDone() || (andSet = this.splitSourcesReference.getAndSet(null)) == null) {
                            return;
                        }
                        DistributedStagesScheduler.closeSplitSources(andSet);
                    }
                });
            }
            if (partitioning.equals(SystemPartitioningHandle.SOURCE_DISTRIBUTION)) {
                Map.Entry entry = (Map.Entry) Iterables.getOnlyElement(createSplitSources.entrySet());
                PlanNodeId planNodeId = (PlanNodeId) entry.getKey();
                SplitSource splitSource = (SplitSource) entry.getValue();
                NodeSelector createNodeSelector = nodeScheduler.createNodeSelector(session, Optional.of(splitSource.getCatalogHandle()).filter(catalogHandle -> {
                    return !catalogHandle.getType().isInternal();
                }));
                Objects.requireNonNull(stageExecution);
                return SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler(stageExecution, planNodeId, splitSource, new DynamicSplitPlacementPolicy(createNodeSelector, stageExecution::getAllTasks), i, dynamicFilterService, tableExecuteContextManager, () -> {
                    return list.stream().anyMatch((v0) -> {
                        return v0.isAnyTaskBlocked();
                    });
                });
            }
            if (partitioning.equals(SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
                Supplier supplier = () -> {
                    return (Collection) list.stream().map((v0) -> {
                        return v0.getTaskStatuses();
                    }).flatMap((v0) -> {
                        return v0.stream();
                    }).collect(ImmutableList.toImmutableList());
                };
                Objects.requireNonNull(stageExecution);
                ScaledWriterScheduler scaledWriterScheduler = new ScaledWriterScheduler(stageExecution, supplier, stageExecution::getTaskStatuses, nodeScheduler.createNodeSelector(session, Optional.empty()), scheduledExecutorService, SystemSessionProperties.getWriterMinSize(session));
                ListenableFuture<Void> whenAllStages = whenAllStages(list, (v0) -> {
                    return v0.isDone();
                });
                Objects.requireNonNull(scaledWriterScheduler);
                whenAllStages.addListener(scaledWriterScheduler::finish, MoreExecutors.directExecutor());
                return scaledWriterScheduler;
            }
            if (createSplitSources.isEmpty()) {
                List<InternalNode> partitionToNode2 = biFunction.apply(partitioning, partitionCount).getPartitionToNode();
                Failures.checkCondition(!partitionToNode2.isEmpty(), StandardErrorCode.NO_NODES_AVAILABLE, "No worker nodes available", new Object[0]);
                return new FixedCountScheduler(stageExecution, partitionToNode2);
            }
            List<PlanNodeId> partitionedSources = fragment.getPartitionedSources();
            Optional<CatalogHandle> catalogHandle2 = partitioning.getCatalogHandle();
            Preconditions.checkArgument(catalogHandle2.isPresent(), "No catalog handle for partitioning handle: %s", partitioning);
            if (fragment.getRemoteSourceNodes().stream().allMatch(remoteSourceNode -> {
                return remoteSourceNode.getExchangeType() == ExchangeNode.Type.REPLICATE;
            })) {
                asBucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioning);
                partitionToNode = new ArrayList(nodeScheduler.createNodeSelector(session, catalogHandle2).allNodes());
                Collections.shuffle(partitionToNode);
            } else {
                NodePartitionMap apply = biFunction.apply(partitioning, partitionCount);
                partitionToNode = apply.getPartitionToNode();
                asBucketNodeMap = apply.asBucketNodeMap();
            }
            return new FixedSourcePartitionedScheduler(stageExecution, createSplitSources, partitionedSources, partitionToNode, asBucketNodeMap, i, nodeScheduler.createNodeSelector(session, catalogHandle2), dynamicFilterService, tableExecuteContextManager);
        }

        private static void closeSplitSources(Collection<SplitSource> collection) {
            Iterator<SplitSource> it = collection.iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (Throwable th) {
                    PipelinedQueryScheduler.log.warn(th, "Error closing split source");
                }
            }
        }

        private static ListenableFuture<Void> whenAllStages(Collection<StageExecution> collection, Predicate<StageExecution.State> predicate) {
            Preconditions.checkArgument(!collection.isEmpty(), "stages is empty");
            Set set = (Set) collection.stream().map((v0) -> {
                return v0.getStageId();
            }).collect(Collectors.toCollection(Sets::newConcurrentHashSet));
            SettableFuture create = SettableFuture.create();
            for (StageExecution stageExecution : collection) {
                stageExecution.addStateChangeListener(state -> {
                    if (predicate.test(state) && set.remove(stageExecution.getStageId()) && set.isEmpty()) {
                        create.set((Object) null);
                    }
                });
            }
            return create;
        }

        private DistributedStagesScheduler(DistributedStagesSchedulerStateMachine distributedStagesSchedulerStateMachine, QueryStateMachine queryStateMachine, SplitSchedulerStats splitSchedulerStats, StageManager stageManager, ExecutionSchedule executionSchedule, Map<StageId, StageScheduler> map, Map<StageId, StageExecution> map2, DynamicFilterService dynamicFilterService) {
            this.stateMachine = (DistributedStagesSchedulerStateMachine) Objects.requireNonNull(distributedStagesSchedulerStateMachine, "stateMachine is null");
            this.queryStateMachine = (QueryStateMachine) Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
            this.schedulerStats = (SplitSchedulerStats) Objects.requireNonNull(splitSchedulerStats, "schedulerStats is null");
            this.stageManager = (StageManager) Objects.requireNonNull(stageManager, "stageManager is null");
            this.executionSchedule = (ExecutionSchedule) Objects.requireNonNull(executionSchedule, "executionSchedule is null");
            this.stageSchedulers = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "stageSchedulers is null"));
            this.stageExecutions = ImmutableMap.copyOf((Map) Objects.requireNonNull(map2, "stageExecutions is null"));
            this.dynamicFilterService = (DynamicFilterService) Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        }

        private void initialize() {
            for (StageExecution stageExecution : this.stageExecutions.values()) {
                List list = (List) this.stageManager.getChildren(stageExecution.getStageId()).stream().map(sqlStage -> {
                    return (StageExecution) Objects.requireNonNull(this.stageExecutions.get(sqlStage.getStageId()), (Supplier<String>) () -> {
                        return "stage execution not found for stage: " + sqlStage;
                    });
                }).collect(ImmutableList.toImmutableList());
                if (!list.isEmpty()) {
                    stageExecution.addStateChangeListener(state -> {
                        if (state == StageExecution.State.FLUSHING || state.isDone()) {
                            list.forEach((v0) -> {
                                v0.cancel();
                            });
                        }
                    });
                }
            }
            Set newConcurrentHashSet = Sets.newConcurrentHashSet();
            for (StageExecution stageExecution2 : this.stageExecutions.values()) {
                stageExecution2.addStateChangeListener(state2 -> {
                    if (this.stateMachine.getState().isDone()) {
                        return;
                    }
                    int size = stageExecution2.getAllTasks().size();
                    if (!state2.canScheduleMoreTasks()) {
                        this.dynamicFilterService.stageCannotScheduleMoreTasks(stageExecution2.getStageId(), stageExecution2.getAttemptId(), size);
                    }
                    if (state2 == StageExecution.State.FAILED) {
                        fail((RuntimeException) stageExecution2.getFailureCause().map((v0) -> {
                            return v0.toException();
                        }).orElseGet(() -> {
                            return new VerifyException(String.format("stage execution for stage %s is failed by failure cause is not present", stageExecution2.getStageId()));
                        }), Optional.of(stageExecution2.getStageId()));
                    } else if (state2.isDone()) {
                        newConcurrentHashSet.add(stageExecution2.getStageId());
                        if (newConcurrentHashSet.containsAll(this.stageExecutions.keySet())) {
                            this.stateMachine.transitionToFinished();
                        }
                    }
                });
            }
        }

        public void schedule() {
            Preconditions.checkState(this.started.compareAndSet(false, true), "already started");
            try {
                try {
                    SetThreadName setThreadName = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
                    try {
                        this.stageSchedulers.values().forEach((v0) -> {
                            v0.start();
                        });
                        while (!this.executionSchedule.isFinished()) {
                            ArrayList arrayList = new ArrayList();
                            StagesScheduleResult stagesToSchedule = this.executionSchedule.getStagesToSchedule();
                            for (StageExecution stageExecution : stagesToSchedule.getStagesToSchedule()) {
                                stageExecution.beginScheduling();
                                ScheduleResult schedule = this.stageSchedulers.get(stageExecution.getStageId()).schedule();
                                if (this.stateMachine.getState() == DistributedStagesSchedulerState.PLANNED && stageExecution.getAllTasks().size() > 0) {
                                    this.stateMachine.transitionToRunning();
                                }
                                if (schedule.isFinished()) {
                                    stageExecution.schedulingComplete();
                                } else if (!schedule.getBlocked().isDone()) {
                                    arrayList.add(schedule.getBlocked());
                                }
                                this.schedulerStats.getSplitsScheduledPerIteration().add(schedule.getSplitsScheduled());
                                if (schedule.getBlockedReason().isPresent()) {
                                    switch (schedule.getBlockedReason().get()) {
                                        case WRITER_SCALING:
                                            break;
                                        case WAITING_FOR_SOURCE:
                                            this.schedulerStats.getWaitingForSource().update(1L);
                                            break;
                                        case SPLIT_QUEUES_FULL:
                                            this.schedulerStats.getSplitQueuesFull().update(1L);
                                            break;
                                        default:
                                            throw new UnsupportedOperationException("Unknown blocked reason: " + schedule.getBlockedReason().get());
                                    }
                                }
                            }
                            if (!arrayList.isEmpty()) {
                                ImmutableList.Builder builder = ImmutableList.builder();
                                builder.addAll(arrayList);
                                Optional<ListenableFuture<Void>> rescheduleFuture = stagesToSchedule.getRescheduleFuture();
                                Objects.requireNonNull(builder);
                                rescheduleFuture.ifPresent((v1) -> {
                                    r1.add(v1);
                                });
                                TimeStat.BlockTimer time = this.schedulerStats.getSleepTime().time();
                                try {
                                    MoreFutures.tryGetFutureValue(MoreFutures.whenAnyComplete(builder.build()), 1, TimeUnit.SECONDS);
                                    if (time != null) {
                                        time.close();
                                    }
                                    Iterator it = arrayList.iterator();
                                    while (it.hasNext()) {
                                        ((ListenableFuture) it.next()).cancel(true);
                                    }
                                } catch (Throwable th) {
                                    if (time != null) {
                                        try {
                                            time.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            }
                        }
                        for (StageExecution stageExecution2 : this.stageExecutions.values()) {
                            StageExecution.State state = stageExecution2.getState();
                            if (state != StageExecution.State.SCHEDULED && state != StageExecution.State.RUNNING && state != StageExecution.State.FLUSHING && !state.isDone()) {
                                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Scheduling is complete, but stage %s is in state %s", stageExecution2.getStageId(), state));
                            }
                        }
                        setThreadName.close();
                        RuntimeException runtimeException = new RuntimeException();
                        Iterator<StageScheduler> it2 = this.stageSchedulers.values().iterator();
                        while (it2.hasNext()) {
                            try {
                                it2.next().close();
                            } catch (Throwable th3) {
                                fail(th3, Optional.empty());
                                if (runtimeException != th3) {
                                    runtimeException.addSuppressed(th3);
                                }
                            }
                        }
                    } catch (Throwable th4) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    RuntimeException runtimeException2 = new RuntimeException();
                    Iterator<StageScheduler> it3 = this.stageSchedulers.values().iterator();
                    while (it3.hasNext()) {
                        try {
                            it3.next().close();
                        } catch (Throwable th7) {
                            fail(th7, Optional.empty());
                            if (runtimeException2 != th7) {
                                runtimeException2.addSuppressed(th7);
                            }
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                fail(th8, Optional.empty());
                RuntimeException runtimeException3 = new RuntimeException();
                Iterator<StageScheduler> it4 = this.stageSchedulers.values().iterator();
                while (it4.hasNext()) {
                    try {
                        it4.next().close();
                    } catch (Throwable th9) {
                        fail(th9, Optional.empty());
                        if (runtimeException3 != th9) {
                            runtimeException3.addSuppressed(th9);
                        }
                    }
                }
            }
        }

        public void cancelStage(StageId stageId) {
            StageExecution stageExecution = this.stageExecutions.get(stageId);
            if (stageExecution != null) {
                stageExecution.cancel();
            }
        }

        public void cancel() {
            this.stateMachine.transitionToCanceled();
            this.stageExecutions.values().forEach((v0) -> {
                v0.cancel();
            });
        }

        public void abort() {
            this.stateMachine.transitionToAborted();
            this.stageExecutions.values().forEach((v0) -> {
                v0.abort();
            });
        }

        public void fail(Throwable th, Optional<StageId> optional) {
            this.stateMachine.transitionToFailed(th, optional);
            this.stageExecutions.values().forEach((v0) -> {
                v0.abort();
            });
        }

        public void reportTaskFailure(TaskId taskId, Throwable th) {
            StageExecution stageExecution = this.stageExecutions.get(taskId.getStageId());
            if (stageExecution == null || stageExecution.getAllTasks().stream().noneMatch(remoteTask -> {
                return remoteTask.getTaskId().equals(taskId);
            })) {
                return;
            }
            stageExecution.failTask(taskId, th);
            this.stateMachine.transitionToFailed(th, Optional.of(taskId.getStageId()));
            this.stageExecutions.values().forEach((v0) -> {
                v0.abort();
            });
        }

        public void addStateChangeListener(StateMachine.StateChangeListener<DistributedStagesSchedulerState> stateChangeListener) {
            this.stateMachine.addStateChangeListener(stateChangeListener);
        }

        public Optional<StageFailureInfo> getFailureCause() {
            return this.stateMachine.getFailureCause();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$DistributedStagesSchedulerState.class */
    public enum DistributedStagesSchedulerState {
        PLANNED(false, false),
        RUNNING(false, false),
        FINISHED(true, false),
        CANCELED(true, false),
        ABORTED(true, true),
        FAILED(true, true);

        public static final Set<DistributedStagesSchedulerState> TERMINAL_STATES = (Set) Stream.of((Object[]) values()).filter((v0) -> {
            return v0.isDone();
        }).collect(ImmutableSet.toImmutableSet());
        private final boolean doneState;
        private final boolean failureState;

        DistributedStagesSchedulerState(boolean z, boolean z2) {
            Preconditions.checkArgument(!z2 || z, "%s is a non-done failure state", name());
            this.doneState = z;
            this.failureState = z2;
        }

        public boolean isDone() {
            return this.doneState;
        }

        public boolean isFailure() {
            return this.failureState;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$DistributedStagesSchedulerStateMachine.class */
    public static class DistributedStagesSchedulerStateMachine {
        private final QueryId queryId;
        private final StateMachine<DistributedStagesSchedulerState> state;
        private final AtomicReference<StageFailureInfo> failureCause = new AtomicReference<>();

        public DistributedStagesSchedulerStateMachine(QueryId queryId, Executor executor) {
            this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
            Objects.requireNonNull(executor, "executor is null");
            this.state = new StateMachine<>("Distributed stages scheduler", executor, DistributedStagesSchedulerState.PLANNED, DistributedStagesSchedulerState.TERMINAL_STATES);
        }

        public DistributedStagesSchedulerState getState() {
            return this.state.get();
        }

        public boolean transitionToRunning() {
            return this.state.setIf(DistributedStagesSchedulerState.RUNNING, distributedStagesSchedulerState -> {
                return !distributedStagesSchedulerState.isDone();
            });
        }

        public boolean transitionToFinished() {
            return this.state.setIf(DistributedStagesSchedulerState.FINISHED, distributedStagesSchedulerState -> {
                return !distributedStagesSchedulerState.isDone();
            });
        }

        public boolean transitionToCanceled() {
            return this.state.setIf(DistributedStagesSchedulerState.CANCELED, distributedStagesSchedulerState -> {
                return !distributedStagesSchedulerState.isDone();
            });
        }

        public boolean transitionToAborted() {
            return this.state.setIf(DistributedStagesSchedulerState.ABORTED, distributedStagesSchedulerState -> {
                return !distributedStagesSchedulerState.isDone();
            });
        }

        public boolean transitionToFailed(Throwable th, Optional<StageId> optional) {
            Objects.requireNonNull(th, "throwable is null");
            this.failureCause.compareAndSet(null, new StageFailureInfo(Failures.toFailure(th), optional));
            boolean z = this.state.setIf(DistributedStagesSchedulerState.FAILED, distributedStagesSchedulerState -> {
                return !distributedStagesSchedulerState.isDone();
            });
            if (z) {
                PipelinedQueryScheduler.log.error(th, "Failure in distributed stage for query %s", new Object[]{this.queryId});
            } else {
                PipelinedQueryScheduler.log.debug(th, "Failure in distributed stage for query %s after finished", new Object[]{this.queryId});
            }
            return z;
        }

        public Optional<StageFailureInfo> getFailureCause() {
            return Optional.ofNullable(this.failureCause.get());
        }

        public void addStateChangeListener(StateMachine.StateChangeListener<DistributedStagesSchedulerState> stateChangeListener) {
            this.state.addStateChangeListener(stateChangeListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$QueryOutputTaskLifecycleListener.class */
    public static class QueryOutputTaskLifecycleListener implements TaskLifecycleListener {
        private final QueryStateMachine queryStateMachine;

        private QueryOutputTaskLifecycleListener(QueryStateMachine queryStateMachine) {
            this.queryStateMachine = (QueryStateMachine) Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        }

        @Override // io.trino.execution.scheduler.TaskLifecycleListener
        public void taskCreated(PlanFragmentId planFragmentId, RemoteTask remoteTask) {
            this.queryStateMachine.updateInputsForQueryResults(ImmutableList.of(new DirectExchangeInput(remoteTask.getTaskId(), HttpUriBuilder.uriBuilderFrom(remoteTask.getTaskStatus().getSelf()).appendPath("results").appendPath("0").build().toString())), false);
        }

        @Override // io.trino.execution.scheduler.TaskLifecycleListener
        public void noMoreTasks(PlanFragmentId planFragmentId) {
            this.queryStateMachine.updateInputsForQueryResults(ImmutableList.of(), true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$StageFailureInfo.class */
    public static class StageFailureInfo {
        private final ExecutionFailureInfo failureInfo;
        private final Optional<StageId> failedStageId;

        private StageFailureInfo(ExecutionFailureInfo executionFailureInfo, Optional<StageId> optional) {
            this.failureInfo = (ExecutionFailureInfo) Objects.requireNonNull(executionFailureInfo, "failureInfo is null");
            this.failedStageId = (Optional) Objects.requireNonNull(optional, "failedStageId is null");
        }

        public ExecutionFailureInfo getFailureInfo() {
            return this.failureInfo;
        }

        public Optional<StageId> getFailedStageId() {
            return this.failedStageId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$TaskFailureReporter.class */
    public static class TaskFailureReporter implements TaskFailureListener {
        private final AtomicReference<DistributedStagesScheduler> distributedStagesScheduler;

        private TaskFailureReporter(AtomicReference<DistributedStagesScheduler> atomicReference) {
            this.distributedStagesScheduler = atomicReference;
        }

        @Override // io.trino.execution.TaskFailureListener
        public void onTaskFailed(TaskId taskId, Throwable th) {
            if ((th instanceof TrinoException) && StandardErrorCode.REMOTE_TASK_FAILED.toErrorCode().equals(((TrinoException) th).getErrorCode())) {
                PipelinedQueryScheduler.log.debug("Task failure discovered while fetching task results: %s", new Object[]{taskId});
                return;
            }
            PipelinedQueryScheduler.log.warn(th, "Reported task failure: %s", new Object[]{taskId});
            DistributedStagesScheduler distributedStagesScheduler = this.distributedStagesScheduler.get();
            if (distributedStagesScheduler != null) {
                distributedStagesScheduler.reportTaskFailure(taskId, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/PipelinedQueryScheduler$TaskLifecycleListenerBridge.class */
    public static class TaskLifecycleListenerBridge implements TaskLifecycleListener {
        private final TaskLifecycleListener listener;

        @GuardedBy("this")
        private final Set<PlanFragmentId> noMoreSourceTasks = new HashSet();

        @GuardedBy("this")
        private boolean done;

        private TaskLifecycleListenerBridge(TaskLifecycleListener taskLifecycleListener) {
            this.listener = (TaskLifecycleListener) Objects.requireNonNull(taskLifecycleListener, "listener is null");
        }

        @Override // io.trino.execution.scheduler.TaskLifecycleListener
        public synchronized void taskCreated(PlanFragmentId planFragmentId, RemoteTask remoteTask) {
            Preconditions.checkState(!this.done, "unexpected state");
            this.listener.taskCreated(planFragmentId, remoteTask);
        }

        @Override // io.trino.execution.scheduler.TaskLifecycleListener
        public synchronized void noMoreTasks(PlanFragmentId planFragmentId) {
            Preconditions.checkState(!this.done, "unexpected state");
            this.noMoreSourceTasks.add(planFragmentId);
        }

        public synchronized void notifyNoMoreSourceTasks() {
            Preconditions.checkState(!this.done, "unexpected state");
            this.done = true;
            Set<PlanFragmentId> set = this.noMoreSourceTasks;
            TaskLifecycleListener taskLifecycleListener = this.listener;
            Objects.requireNonNull(taskLifecycleListener);
            set.forEach(taskLifecycleListener::noMoreTasks);
        }
    }

    public PipelinedQueryScheduler(QueryStateMachine queryStateMachine, SubPlan subPlan, NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, RemoteTaskFactory remoteTaskFactory, boolean z, int i, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, FailureDetector failureDetector, NodeTaskMap nodeTaskMap, ExecutionPolicy executionPolicy, SplitSchedulerStats splitSchedulerStats, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, Metadata metadata, SplitSourceFactory splitSourceFactory, SqlTaskManager sqlTaskManager) {
        this.queryStateMachine = (QueryStateMachine) Objects.requireNonNull(queryStateMachine, "queryStateMachine is null");
        this.nodePartitioningManager = (NodePartitioningManager) Objects.requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
        this.nodeScheduler = (NodeScheduler) Objects.requireNonNull(nodeScheduler, "nodeScheduler is null");
        this.splitBatchSize = i;
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "queryExecutor is null");
        this.schedulerExecutor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "schedulerExecutor is null");
        this.failureDetector = (FailureDetector) Objects.requireNonNull(failureDetector, "failureDetector is null");
        this.executionPolicy = (ExecutionPolicy) Objects.requireNonNull(executionPolicy, "executionPolicy is null");
        this.schedulerStats = (SplitSchedulerStats) Objects.requireNonNull(splitSchedulerStats, "schedulerStats is null");
        this.dynamicFilterService = (DynamicFilterService) Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.tableExecuteContextManager = (TableExecuteContextManager) Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.splitSourceFactory = (SplitSourceFactory) Objects.requireNonNull(splitSourceFactory, "splitSourceFactory is null");
        this.stageManager = StageManager.create(queryStateMachine, metadata, remoteTaskFactory, nodeTaskMap, splitSchedulerStats, subPlan, z);
        this.coordinatorStagesScheduler = CoordinatorStagesScheduler.create(queryStateMachine, nodeScheduler, this.stageManager, failureDetector, scheduledExecutorService, this.distributedStagesScheduler, sqlTaskManager);
        this.retryPolicy = SystemSessionProperties.getRetryPolicy(queryStateMachine.getSession());
        Verify.verify(this.retryPolicy == RetryPolicy.NONE || this.retryPolicy == RetryPolicy.QUERY, "unexpected retry policy: %s", this.retryPolicy);
        this.maxQueryRetryAttempts = SystemSessionProperties.getQueryRetryAttempts(queryStateMachine.getSession());
        this.retryInitialDelay = SystemSessionProperties.getRetryInitialDelay(queryStateMachine.getSession());
        this.retryMaxDelay = SystemSessionProperties.getRetryMaxDelay(queryStateMachine.getSession());
        this.retryDelayScaleFactor = SystemSessionProperties.getRetryDelayScaleFactor(queryStateMachine.getSession());
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        if (this.queryStateMachine.isDone()) {
            return;
        }
        this.queryStateMachine.addStateChangeListener(queryState -> {
            DistributedStagesScheduler distributedStagesScheduler;
            if (queryState.isDone()) {
                synchronized (this) {
                    distributedStagesScheduler = this.distributedStagesScheduler.get();
                }
                if (queryState == QueryState.FINISHED) {
                    this.coordinatorStagesScheduler.cancel();
                    if (distributedStagesScheduler != null) {
                        distributedStagesScheduler.cancel();
                    }
                    this.stageManager.finish();
                } else if (queryState == QueryState.FAILED) {
                    this.coordinatorStagesScheduler.abort();
                    if (distributedStagesScheduler != null) {
                        distributedStagesScheduler.abort();
                    }
                    this.stageManager.abort();
                }
                this.queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo()));
            }
        });
        Optional<DistributedStagesScheduler> createDistributedStagesScheduler = createDistributedStagesScheduler(this.currentAttempt.get());
        this.coordinatorStagesScheduler.schedule();
        createDistributedStagesScheduler.ifPresent(distributedStagesScheduler -> {
            ExecutorService executorService = this.executor;
            Objects.requireNonNull(distributedStagesScheduler);
            this.distributedStagesSchedulingTask = executorService.submit(distributedStagesScheduler::schedule, null);
        });
    }

    private synchronized Optional<DistributedStagesScheduler> createDistributedStagesScheduler(int i) {
        Verify.verify(i == 0 || this.retryPolicy == RetryPolicy.QUERY, "unexpected attempt %s for retry policy %s", i, this.retryPolicy);
        if (this.queryStateMachine.isDone()) {
            return Optional.empty();
        }
        switch (this.retryPolicy) {
            case QUERY:
            case NONE:
                if (i > 0) {
                    this.dynamicFilterService.registerQueryRetry(this.queryStateMachine.getQueryId(), i);
                }
                DistributedStagesScheduler create = DistributedStagesScheduler.create(this.queryStateMachine, this.schedulerStats, this.nodeScheduler, this.nodePartitioningManager, this.stageManager, this.coordinatorStagesScheduler, this.executionPolicy, this.failureDetector, this.schedulerExecutor, this.splitSourceFactory, this.splitBatchSize, this.dynamicFilterService, this.tableExecuteContextManager, this.retryPolicy, i);
                this.distributedStagesScheduler.set(create);
                create.addStateChangeListener(distributedStagesSchedulerState -> {
                    if (this.queryStateMachine.getQueryState() == QueryState.STARTING && (distributedStagesSchedulerState == DistributedStagesSchedulerState.RUNNING || distributedStagesSchedulerState.isDone())) {
                        this.queryStateMachine.transitionToRunning();
                    }
                    if (distributedStagesSchedulerState.isDone() && !distributedStagesSchedulerState.isFailure()) {
                        this.stageManager.getDistributedStagesInTopologicalOrder().forEach(sqlStage -> {
                            this.stageManager.get(sqlStage.getStageId()).finish();
                        });
                    }
                    if (this.stageManager.getCoordinatorStagesInTopologicalOrder().isEmpty()) {
                        if (distributedStagesSchedulerState == DistributedStagesSchedulerState.FINISHED) {
                            this.queryStateMachine.transitionToFinishing();
                        } else if (distributedStagesSchedulerState == DistributedStagesSchedulerState.CANCELED) {
                            this.queryStateMachine.transitionToCanceled();
                        }
                    }
                    if (distributedStagesSchedulerState == DistributedStagesSchedulerState.FAILED) {
                        StageFailureInfo orElseGet = create.getFailureCause().orElseGet(() -> {
                            return new StageFailureInfo(Failures.toFailure(new VerifyException("distributedStagesScheduler failed but failure cause is not present")), Optional.empty());
                        });
                        if (!shouldRetry(orElseGet.getFailureInfo().getErrorCode())) {
                            this.stageManager.getDistributedStagesInTopologicalOrder().forEach(sqlStage2 -> {
                                if (orElseGet.getFailedStageId().isPresent() && orElseGet.getFailedStageId().get().equals(sqlStage2.getStageId())) {
                                    sqlStage2.fail(orElseGet.getFailureInfo().toException());
                                } else {
                                    sqlStage2.abort();
                                }
                            });
                            this.queryStateMachine.transitionToFailed(orElseGet.getFailureInfo().toException());
                        } else {
                            long min = Math.min(this.retryInitialDelay.toMillis() * ((long) Math.pow(this.retryDelayScaleFactor, this.currentAttempt.get())), this.retryMaxDelay.toMillis());
                            this.currentAttempt.incrementAndGet();
                            scheduleRetryWithDelay(min);
                        }
                    }
                });
                return Optional.of(create);
            default:
                throw new IllegalArgumentException("Unexpected retry policy: " + this.retryPolicy);
        }
    }

    private boolean shouldRetry(ErrorCode errorCode) {
        return this.retryPolicy == RetryPolicy.QUERY && this.currentAttempt.get() < this.maxQueryRetryAttempts && isRetryableErrorCode(errorCode);
    }

    private static boolean isRetryableErrorCode(ErrorCode errorCode) {
        return errorCode == null || errorCode.getType() == ErrorType.INTERNAL_ERROR || errorCode.getType() == ErrorType.EXTERNAL || errorCode.getCode() == StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode().getCode();
    }

    private void scheduleRetryWithDelay(long j) {
        try {
            this.schedulerExecutor.schedule(this::scheduleRetry, j, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            this.queryStateMachine.transitionToFailed(th);
        }
    }

    private synchronized void scheduleRetry() {
        try {
            Preconditions.checkState(this.distributedStagesSchedulingTask != null, "schedulingTask is expected to be set");
            this.distributedStagesSchedulingTask.get(5L, TimeUnit.MINUTES);
            createDistributedStagesScheduler(this.currentAttempt.get()).ifPresent(distributedStagesScheduler -> {
                ExecutorService executorService = this.executor;
                Objects.requireNonNull(distributedStagesScheduler);
                this.distributedStagesSchedulingTask = executorService.submit(distributedStagesScheduler::schedule, null);
            });
        } catch (Throwable th) {
            this.queryStateMachine.transitionToFailed(th);
        }
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public synchronized void cancelStage(StageId stageId) {
        SetThreadName setThreadName = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
        try {
            this.coordinatorStagesScheduler.cancelStage(stageId);
            DistributedStagesScheduler distributedStagesScheduler = this.distributedStagesScheduler.get();
            if (distributedStagesScheduler != null) {
                distributedStagesScheduler.cancelStage(stageId);
            }
            setThreadName.close();
        } catch (Throwable th) {
            try {
                setThreadName.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public void failTask(TaskId taskId, Throwable th) {
        SetThreadName setThreadName = new SetThreadName("Query-%s", new Object[]{this.queryStateMachine.getQueryId()});
        try {
            this.stageManager.failTaskRemotely(taskId, th);
            setThreadName.close();
        } catch (Throwable th2) {
            try {
                setThreadName.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public BasicStageStats getBasicStageStats() {
        return this.stageManager.getBasicStageStats();
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public StageInfo getStageInfo() {
        return this.stageManager.getStageInfo();
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public long getUserMemoryReservation() {
        return this.stageManager.getUserMemoryReservation();
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public long getTotalMemoryReservation() {
        return this.stageManager.getTotalMemoryReservation();
    }

    @Override // io.trino.execution.scheduler.QueryScheduler
    public Duration getTotalCpuTime() {
        return this.stageManager.getTotalCpuTime();
    }
}
