package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.concurrent.Threads;
import io.airlift.tracing.Tracing;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.client.NodeVersion;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.MockRemoteTaskFactory;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStage;
import io.trino.execution.StageId;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.TableInfo;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.execution.scheduler.StageExecution;
import io.trino.failuredetector.NoOpFailureDetector;
import io.trino.metadata.FunctionManager;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.metadata.MetadataManager;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.operator.RetryPolicy;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.split.ConnectorAwareSplitSource;
import io.trino.sql.DynamicFilters;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.PlanNodeIdAllocator;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.JoinType;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingSplit;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(ExecutionMode.CONCURRENT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.class */
public class TestMultiSourcePartitionedScheduler {
    private static final PlanNodeId TABLE_SCAN_1_NODE_ID = new PlanNodeId("1");
    private static final PlanNodeId TABLE_SCAN_2_NODE_ID = new PlanNodeId("2");
    private static final QueryId QUERY_ID = new QueryId("query");
    private static final DynamicFilterId DYNAMIC_FILTER_ID = new DynamicFilterId("filter1");
    private final ExecutorService queryExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("stageExecutor-%s"));
    private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed("stageScheduledExecutor-%s"));
    private final InMemoryNodeManager nodeManager = new InMemoryNodeManager(new InternalNode[0]);
    private final FinalizerService finalizerService = new FinalizerService();
    private final Metadata metadata = MetadataManager.createTestMetadataManager();
    private final FunctionManager functionManager = FunctionManager.createTestingFunctionManager();
    private final TypeOperators typeOperators = new TypeOperators();
    private final Session session = TestingSession.testSessionBuilder().build();
    private final PlanNodeIdAllocator planNodeIdAllocator = new PlanNodeIdAllocator();

    /* loaded from: input_file:io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler$InMemoryNodeManagerByCatalog.class */
    private static class InMemoryNodeManagerByCatalog extends InMemoryNodeManager {
        private final Function<CatalogHandle, Set<InternalNode>> nodesByCatalogs;

        public InMemoryNodeManagerByCatalog(Set<InternalNode> set, Function<CatalogHandle, Set<InternalNode>> function) {
            super(set);
            this.nodesByCatalogs = function;
        }

        public Set<InternalNode> getActiveCatalogNodes(CatalogHandle catalogHandle) {
            return this.nodesByCatalogs.apply(catalogHandle);
        }
    }

    /* loaded from: input_file:io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler$QueuedSplitSource.class */
    private static class QueuedSplitSource implements ConnectorSplitSource {
        private final Supplier<ConnectorSplit> splitFactory;
        private final LinkedBlockingQueue<ConnectorSplit> queue;
        private CompletableFuture<?> notEmptyFuture;
        private boolean closed;

        public QueuedSplitSource(Supplier<ConnectorSplit> supplier) {
            this.queue = new LinkedBlockingQueue<>();
            this.notEmptyFuture = new CompletableFuture<>();
            this.splitFactory = (Supplier) Objects.requireNonNull(supplier, "splitFactory is null");
        }

        public QueuedSplitSource() {
            this.queue = new LinkedBlockingQueue<>();
            this.notEmptyFuture = new CompletableFuture<>();
            this.splitFactory = TestingSplit::createRemoteSplit;
        }

        synchronized void addSplits(int i, boolean z) {
            if (this.closed) {
                return;
            }
            for (int i2 = 0; i2 < i; i2++) {
                this.queue.add(this.splitFactory.get());
            }
            if (z) {
                close();
            }
            this.notEmptyFuture.complete(null);
        }

        public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int i) {
            return this.notEmptyFuture.thenApply(obj -> {
                return getBatch(i);
            }).thenApply((Function<? super U, ? extends U>) list -> {
                return new ConnectorSplitSource.ConnectorSplitBatch(list, isFinished());
            });
        }

        private synchronized List<ConnectorSplit> getBatch(int i) {
            ArrayList arrayList = new ArrayList(i);
            this.queue.drainTo(arrayList, i);
            if (this.queue.isEmpty() && !this.closed && this.notEmptyFuture.isDone()) {
                this.notEmptyFuture = new CompletableFuture<>();
            }
            return ImmutableList.copyOf(arrayList);
        }

        public synchronized boolean isFinished() {
            return this.closed && this.queue.isEmpty();
        }

        public synchronized void close() {
            this.closed = true;
        }
    }

    public TestMultiSourcePartitionedScheduler() {
        this.nodeManager.addNodes(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
    }

    @BeforeAll
    public void setUp() {
        this.finalizerService.start();
    }

    @AfterAll
    public void destroyExecutor() {
        this.queryExecutor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
        this.finalizerService.destroy();
    }

    @Test
    public void testScheduleSplitsBatchedNoBlocking() {
        PlanFragment createFragment = createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment, nodeTaskMap);
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(60), TABLE_SCAN_2_NODE_ID, createFixedSplitSource(60)), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, this.nodeManager), createStageExecution, 7);
        int i = 0;
        while (i <= 16) {
            ScheduleResult schedule = prepareScheduler.schedule();
            if (i == 16) {
                assertEffectivelyFinished(schedule, prepareScheduler);
            } else {
                Assertions.assertThat(schedule.isFinished()).isFalse();
            }
            Assertions.assertThat(schedule.getBlocked().isDone()).isTrue();
            Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(i == 0 ? 3 : 0);
            i++;
        }
        Iterator it = createStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((RemoteTask) it.next()).getPartitionedSplitsInfo().getCount()).isEqualTo(40);
        }
        createStageExecution.abort();
    }

    @Test
    public void testScheduleSplitsBatchedBlockingSplitSource() {
        PlanFragment createFragment = createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment, nodeTaskMap);
        QueuedSplitSource queuedSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(10), TABLE_SCAN_2_NODE_ID, queuedSplitSource), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, this.nodeManager), createStageExecution, 5);
        ScheduleResult schedule = prepareScheduler.schedule();
        Assertions.assertThat(schedule.isFinished()).isFalse();
        Assertions.assertThat(schedule.getBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(3);
        ScheduleResult schedule2 = prepareScheduler.schedule();
        Assertions.assertThat(schedule2.isFinished()).isFalse();
        Assertions.assertThat(schedule2.getBlocked().isDone()).isFalse();
        Assertions.assertThat(schedule2.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat(schedule2.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.WAITING_FOR_SOURCE));
        queuedSplitSource.addSplits(2, true);
        ScheduleResult schedule3 = prepareScheduler.schedule();
        Assertions.assertThat(schedule3.getBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule3.getSplitsScheduled()).isEqualTo(2);
        Assertions.assertThat(schedule3.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat(schedule3.getBlockedReason()).isEqualTo(Optional.empty());
        Assertions.assertThat(schedule3.isFinished()).isTrue();
        assertPartitionedSplitCount(createStageExecution, 12);
        assertEffectivelyFinished(schedule3, prepareScheduler);
        Iterator it = createStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((RemoteTask) it.next()).getPartitionedSplitsInfo().getCount()).isEqualTo(4);
        }
        createStageExecution.abort();
    }

    @Test
    public void testScheduleSplitsTasksAreFull() {
        PlanFragment createFragment = createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment, nodeTaskMap);
        ScheduleResult schedule = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(200), TABLE_SCAN_2_NODE_ID, createFixedSplitSource(200)), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, this.nodeManager), createStageExecution, 200).schedule();
        Assertions.assertThat(schedule.getSplitsScheduled()).isEqualTo(300);
        Assertions.assertThat(schedule.isFinished()).isFalse();
        Assertions.assertThat(schedule.getBlocked().isDone()).isFalse();
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat(schedule.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat(createStageExecution.getAllTasks().stream().mapToInt(remoteTask -> {
            return remoteTask.getPartitionedSplitsInfo().getCount();
        }).sum()).isEqualTo(300);
        createStageExecution.abort();
    }

    @Test
    public void testBalancedSplitAssignment() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment(), nodeTaskMap);
        QueuedSplitSource queuedSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        QueuedSplitSource queuedSplitSource2 = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, queuedSplitSource, TABLE_SCAN_2_NODE_ID, queuedSplitSource2), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, inMemoryNodeManager), createStageExecution, 15);
        queuedSplitSource.addSplits(15, true);
        ScheduleResult schedule = prepareScheduler.schedule();
        Assertions.assertThat(schedule.getBlocked().isDone()).isFalse();
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat(createStageExecution.getAllTasks().size()).isEqualTo(3);
        Iterator it = createStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((RemoteTask) it.next()).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        }
        inMemoryNodeManager.addNodes(new InternalNode[]{new InternalNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN, false)});
        queuedSplitSource2.addSplits(3, true);
        ScheduleResult schedule2 = prepareScheduler.schedule();
        assertEffectivelyFinished(schedule2, prepareScheduler);
        Assertions.assertThat(schedule2.getBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule2.isFinished()).isTrue();
        Assertions.assertThat(schedule2.getNewTasks().size()).isEqualTo(1);
        Assertions.assertThat(createStageExecution.getAllTasks().size()).isEqualTo(4);
        Assertions.assertThat(((RemoteTask) createStageExecution.getAllTasks().get(0)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat(((RemoteTask) createStageExecution.getAllTasks().get(1)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat(((RemoteTask) createStageExecution.getAllTasks().get(2)).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        Assertions.assertThat(((RemoteTask) createStageExecution.getAllTasks().get(3)).getPartitionedSplitsInfo().getCount()).isEqualTo(3);
        StageExecution createStageExecution2 = createStageExecution(createFragment(), nodeTaskMap);
        StageScheduler prepareScheduler2 = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(10), TABLE_SCAN_2_NODE_ID, createFixedSplitSource(10)), createSplitPlacementPolicies(this.session, createStageExecution2, nodeTaskMap, inMemoryNodeManager), createStageExecution2, 10);
        ScheduleResult schedule3 = prepareScheduler2.schedule();
        assertEffectivelyFinished(schedule3, prepareScheduler2);
        Assertions.assertThat(schedule3.getBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule3.isFinished()).isTrue();
        Assertions.assertThat(schedule3.getNewTasks().size()).isEqualTo(4);
        Assertions.assertThat(createStageExecution2.getAllTasks().size()).isEqualTo(4);
        Iterator it2 = createStageExecution2.getAllTasks().iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(((RemoteTask) it2.next()).getPartitionedSplitsInfo().getCount()).isEqualTo(5);
        }
        createStageExecution.abort();
        createStageExecution2.abort();
    }

    @Test
    public void testScheduleEmptySources() {
        PlanFragment createFragment = createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment, nodeTaskMap);
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(0), TABLE_SCAN_2_NODE_ID, createFixedSplitSource(0)), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, this.nodeManager), createStageExecution, 15);
        ScheduleResult schedule = prepareScheduler.schedule();
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(2);
        assertEffectivelyFinished(schedule, prepareScheduler);
        createStageExecution.abort();
    }

    @Test
    public void testDynamicFiltersUnblockedOnBlockedBuildSource() {
        PlanFragment createFragment = createFragment();
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecution createStageExecution = createStageExecution(createFragment, nodeTaskMap);
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig());
        dynamicFilterService.registerQuery(QUERY_ID, SessionTestUtils.TEST_SESSION, ImmutableSet.of(DYNAMIC_FILTER_ID), ImmutableSet.of(DYNAMIC_FILTER_ID), ImmutableSet.of(DYNAMIC_FILTER_ID));
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, new QueuedSplitSource(), TABLE_SCAN_2_NODE_ID, new QueuedSplitSource()), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, this.nodeManager), createStageExecution, dynamicFilterService, () -> {
            return true;
        }, 15);
        Symbol newSymbol = new SymbolAllocator().newSymbol("DF_SYMBOL1", BigintType.BIGINT);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(QUERY_ID, ImmutableList.of(new DynamicFilters.Descriptor(DYNAMIC_FILTER_ID, newSymbol.toSymbolReference())), ImmutableMap.of(newSymbol, new TestingMetadata.TestingColumnHandle("probeColumnA")));
        Assertions.assertThat(createStageExecution.getState()).isEqualTo(StageExecution.State.PLANNED);
        prepareScheduler.start();
        Assertions.assertThat(createStageExecution.getAllTasks().size()).isEqualTo(1);
        Assertions.assertThat(createStageExecution.getState()).isEqualTo(StageExecution.State.SCHEDULING);
        Assertions.assertThat(createDynamicFilter.isBlocked().isDone()).isFalse();
        ScheduleResult schedule = prepareScheduler.schedule();
        Assertions.assertThat(createDynamicFilter.isBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule.getSplitsScheduled()).isEqualTo(0);
    }

    @Test
    public void testNoNewTaskScheduledWhenChildStageBufferIsOverUtilized() {
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager(new InternalNode[]{new InternalNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new InternalNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
        StageExecution createStageExecution = createStageExecution(createFragment(), nodeTaskMap);
        StageScheduler prepareScheduler = prepareScheduler(ImmutableMap.of(TABLE_SCAN_1_NODE_ID, createFixedSplitSource(200), TABLE_SCAN_2_NODE_ID, createFixedSplitSource(200)), createSplitPlacementPolicies(this.session, createStageExecution, nodeTaskMap, inMemoryNodeManager), createStageExecution, new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig()), () -> {
            return true;
        }, 200);
        ScheduleResult schedule = prepareScheduler.schedule();
        Assertions.assertThat(schedule.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(3);
        Assertions.assertThat(schedule.getSplitsScheduled()).isEqualTo(300);
        Iterator it = schedule.getNewTasks().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((RemoteTask) it.next()).getPartitionedSplitsInfo().getCount()).isEqualTo(100);
        }
        inMemoryNodeManager.addNodes(new InternalNode[]{new InternalNode("other4", URI.create("http://127.0.0.4:14"), NodeVersion.UNKNOWN, false)});
        ScheduleResult schedule2 = prepareScheduler.schedule();
        Assertions.assertThat(schedule2.getBlockedReason()).isEqualTo(Optional.of(ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL));
        Assertions.assertThat(schedule2.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat(schedule2.getSplitsScheduled()).isEqualTo(0);
    }

    private static void assertPartitionedSplitCount(StageExecution stageExecution, int i) {
        Assertions.assertThat(stageExecution.getAllTasks().stream().mapToInt(remoteTask -> {
            return remoteTask.getPartitionedSplitsInfo().getCount();
        }).sum()).isEqualTo(i);
    }

    private static void assertEffectivelyFinished(ScheduleResult scheduleResult, StageScheduler stageScheduler) {
        if (scheduleResult.isFinished()) {
            Assertions.assertThat(scheduleResult.getBlocked().isDone()).isTrue();
            return;
        }
        Assertions.assertThat(scheduleResult.getBlocked().isDone()).isTrue();
        ScheduleResult schedule = stageScheduler.schedule();
        Assertions.assertThat(schedule.isFinished()).isTrue();
        Assertions.assertThat(schedule.getBlocked().isDone()).isTrue();
        Assertions.assertThat(schedule.getNewTasks().size()).isEqualTo(0);
        Assertions.assertThat(schedule.getSplitsScheduled()).isEqualTo(0);
    }

    private StageScheduler prepareScheduler(Map<PlanNodeId, ConnectorSplitSource> map, SplitPlacementPolicy splitPlacementPolicy, StageExecution stageExecution, int i) {
        return prepareScheduler(map, splitPlacementPolicy, stageExecution, new DynamicFilterService(this.metadata, this.functionManager, this.typeOperators, new DynamicFilterConfig()), () -> {
            return false;
        }, i);
    }

    private StageScheduler prepareScheduler(Map<PlanNodeId, ConnectorSplitSource> map, SplitPlacementPolicy splitPlacementPolicy, StageExecution stageExecution, DynamicFilterService dynamicFilterService, BooleanSupplier booleanSupplier, int i) {
        return new MultiSourcePartitionedScheduler(stageExecution, (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ConnectorAwareSplitSource(TestingHandles.TEST_CATALOG_HANDLE, (ConnectorSplitSource) entry.getValue());
        })), splitPlacementPolicy, i, dynamicFilterService, new TableExecuteContextManager(), booleanSupplier);
    }

    private PlanFragment createFragment() {
        return createFragment(TestingHandles.TEST_TABLE_HANDLE, TestingHandles.TEST_TABLE_HANDLE);
    }

    private PlanFragment createFragment(TableHandle tableHandle, TableHandle tableHandle2) {
        Symbol symbol = new Symbol(VarcharType.VARCHAR, "column");
        Symbol symbol2 = new Symbol(VarcharType.VARCHAR, "buildColumn");
        TableScanNode newInstance = TableScanNode.newInstance(TABLE_SCAN_1_NODE_ID, tableHandle, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")), false, Optional.empty());
        FilterNode filterNode = new FilterNode(new PlanNodeId("filter_node_id"), newInstance, DynamicFilters.createDynamicFilterExpression(MetadataManager.createTestMetadataManager(), DYNAMIC_FILTER_ID, VarcharType.VARCHAR, symbol.toSymbolReference()));
        TableScanNode newInstance2 = TableScanNode.newInstance(TABLE_SCAN_2_NODE_ID, tableHandle2, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")), false, Optional.empty());
        FilterNode filterNode2 = new FilterNode(new PlanNodeId("filter_node_id"), newInstance2, DynamicFilters.createDynamicFilterExpression(MetadataManager.createTestMetadataManager(), DYNAMIC_FILTER_ID, VarcharType.VARCHAR, symbol.toSymbolReference()));
        RemoteSourceNode remoteSourceNode = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(symbol2), Optional.empty(), ExchangeNode.Type.REPLICATE, RetryPolicy.NONE);
        return new PlanFragment(new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), JoinType.INNER, new ExchangeNode(this.planNodeIdAllocator.getNextId(), ExchangeNode.Type.REPARTITION, ExchangeNode.Scope.LOCAL, new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), newInstance.getOutputSymbols()), ImmutableList.of(filterNode, filterNode2), ImmutableList.of(newInstance.getOutputSymbols(), newInstance2.getOutputSymbols()), Optional.empty()), remoteSourceNode, ImmutableList.of(), newInstance.getOutputSymbols(), remoteSourceNode.getOutputSymbols(), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableMap.of(DYNAMIC_FILTER_ID, symbol2), Optional.empty()), ImmutableSet.of(symbol), SystemPartitioningHandle.SOURCE_DISTRIBUTION, Optional.empty(), ImmutableList.of(TABLE_SCAN_1_NODE_ID, TABLE_SCAN_2_NODE_ID), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)), StatsAndCosts.empty(), ImmutableList.of(), ImmutableMap.of(), Optional.empty());
    }

    private static ConnectorSplitSource createFixedSplitSource(int i) {
        return new FixedSplitSource(IntStream.range(0, i).mapToObj(i2 -> {
            return new TestingSplit(true, ImmutableList.of());
        }).toList());
    }

    private SplitPlacementPolicy createSplitPlacementPolicies(Session session, StageExecution stageExecution, NodeTaskMap nodeTaskMap, InternalNodeManager internalNodeManager) {
        return createSplitPlacementPolicies(session, stageExecution, nodeTaskMap, internalNodeManager, TestingHandles.TEST_CATALOG_HANDLE);
    }

    private SplitPlacementPolicy createSplitPlacementPolicies(Session session, StageExecution stageExecution, NodeTaskMap nodeTaskMap, InternalNodeManager internalNodeManager, CatalogHandle catalogHandle) {
        NodeSelector createNodeSelector = new NodeScheduler(new UniformNodeSelectorFactory(internalNodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false).setMaxSplitsPerNode(100).setMinPendingSplitsPerTask(0).setSplitsBalancingPolicy(NodeSchedulerConfig.SplitsBalancingPolicy.STAGE), nodeTaskMap, new Duration(0.0d, TimeUnit.SECONDS))).createNodeSelector(session, Optional.of(catalogHandle));
        Objects.requireNonNull(stageExecution);
        return new DynamicSplitPlacementPolicy(createNodeSelector, stageExecution::getAllTasks);
    }

    private StageExecution createStageExecution(PlanFragment planFragment, NodeTaskMap nodeTaskMap) {
        SqlStage createSqlStage = SqlStage.createSqlStage(new StageId(QUERY_ID, 0), planFragment, ImmutableMap.of(TABLE_SCAN_1_NODE_ID, new TableInfo(Optional.of("test"), new QualifiedObjectName("test", "test", "test"), TupleDomain.all()), TABLE_SCAN_2_NODE_ID, new TableInfo(Optional.of("test"), new QualifiedObjectName("test", "test", "test"), TupleDomain.all())), new MockRemoteTaskFactory(this.queryExecutor, this.scheduledExecutor), SessionTestUtils.TEST_SESSION, true, nodeTaskMap, this.queryExecutor, Tracing.noopTracer(), Span.getInvalid(), new SplitSchedulerStats());
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put(planFragment.getId(), new PartitionedPipelinedOutputBufferManager(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 1));
        planFragment.getRemoteSourceNodes().stream().flatMap(remoteSourceNode -> {
            return remoteSourceNode.getSourceFragmentIds().stream();
        }).forEach(planFragmentId -> {
            builder.put(planFragmentId, new PartitionedPipelinedOutputBufferManager(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 10));
        });
        return PipelinedStageExecution.createPipelinedStageExecution(createSqlStage, builder.buildOrThrow(), TaskLifecycleListener.NO_OP, new NoOpFailureDetector(), this.queryExecutor, Optional.of(new int[]{0}), 0);
    }
}
