package io.prestosql.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.prestosql.Session;
import io.prestosql.cost.StatsAndCosts;
import io.prestosql.execution.DynamicFilterConfig;
import io.prestosql.execution.StageId;
import io.prestosql.execution.StageState;
import io.prestosql.execution.TaskId;
import io.prestosql.metadata.MetadataManager;
import io.prestosql.operator.StageExecutionDescriptor;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.TestingColumnHandle;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.Range;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.predicate.ValueSet;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.VarcharType;
import io.prestosql.sql.DynamicFilters;
import io.prestosql.sql.planner.Partitioning;
import io.prestosql.sql.planner.PartitioningHandle;
import io.prestosql.sql.planner.PartitioningScheme;
import io.prestosql.sql.planner.PlanFragment;
import io.prestosql.sql.planner.Symbol;
import io.prestosql.sql.planner.SystemPartitioningHandle;
import io.prestosql.sql.planner.iterative.rule.test.PlanBuilder;
import io.prestosql.sql.planner.plan.DynamicFilterId;
import io.prestosql.sql.planner.plan.ExchangeNode;
import io.prestosql.sql.planner.plan.FilterNode;
import io.prestosql.sql.planner.plan.JoinNode;
import io.prestosql.sql.planner.plan.PlanFragmentId;
import io.prestosql.sql.planner.plan.PlanNodeId;
import io.prestosql.sql.planner.plan.RemoteSourceNode;
import io.prestosql.sql.planner.plan.TableScanNode;
import io.prestosql.sql.tree.Expression;
import io.prestosql.sql.tree.SymbolReference;
import io.prestosql.testing.TestingHandles;
import io.prestosql.testing.TestingMetadata;
import io.prestosql.testing.TestingSession;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/prestosql/server/TestDynamicFilterService.class */
public class TestDynamicFilterService {
    private static final Session session = TestingSession.testSessionBuilder().build();

    /* loaded from: input_file:io/prestosql/server/TestDynamicFilterService$TestDynamicFiltersStageSupplier.class */
    private static class TestDynamicFiltersStageSupplier implements Supplier<List<DynamicFilterService.StageDynamicFilters>> {
        private final Map<StageId, Map<TaskId, Map<DynamicFilterId, Domain>>> stageDynamicFilters = new HashMap();
        private final StageState stageState;
        private int requestCount;

        TestDynamicFiltersStageSupplier(StageState stageState) {
            this.stageState = stageState;
        }

        void addTasks(List<TaskId> list) {
            list.forEach(taskId -> {
                this.stageDynamicFilters.computeIfAbsent(taskId.getStageId(), stageId -> {
                    return new HashMap();
                }).put(taskId, new HashMap());
            });
        }

        void storeSummary(DynamicFilterId dynamicFilterId, TaskId taskId, Domain domain) {
            this.stageDynamicFilters.get(taskId.getStageId()).get(taskId).put(dynamicFilterId, domain);
        }

        int getRequestCount() {
            return this.requestCount;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public List<DynamicFilterService.StageDynamicFilters> get() {
            this.requestCount++;
            return ImmutableList.copyOf((Collection) this.stageDynamicFilters.values().stream().map(map -> {
                return new DynamicFilterService.StageDynamicFilters(this.stageState, map.size(), (List) map.values().stream().collect(ImmutableList.toImmutableList()));
            }).collect(ImmutableList.toImmutableList()));
        }
    }

    @Test
    public void testDynamicFilterSummaryCompletion() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 0);
        List<TaskId> of = ImmutableList.of(new TaskId(stageId, 0), new TaskId(stageId, 1), new TaskId(stageId, 2));
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.RUNNING);
        testDynamicFiltersStageSupplier.addTasks(of);
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 0);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 0), Domain.singleValue(IntegerType.INTEGER, 1L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 0);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 1), Domain.singleValue(IntegerType.INTEGER, 2L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 2);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 0);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 2), Domain.singleValue(IntegerType.INTEGER, 3L));
        dynamicFilterService.collectDynamicFilters();
        Optional summary = dynamicFilterService.getSummary(queryId, dynamicFilterId);
        Assert.assertTrue(summary.isPresent());
        Assert.assertEquals(summary.get(), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L, 3L)));
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 3);
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 0);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFilterDomainStats(), ImmutableList.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, getExpectedDomainString(1L, 3L), 3, 0)));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 3);
    }

    @Test
    public void testDynamicFilter() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        DynamicFilterId dynamicFilterId3 = new DynamicFilterId("df3");
        Expression expression = PlanBuilder.expression("DF_SYMBOL1");
        Expression expression2 = PlanBuilder.expression("DF_SYMBOL2");
        Expression expression3 = PlanBuilder.expression("DF_SYMBOL3");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        StageId stageId2 = new StageId(queryId, 2);
        StageId stageId3 = new StageId(queryId, 3);
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.RUNNING);
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0), new TaskId(stageId, 1)));
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId2, 0), new TaskId(stageId2, 1)));
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId3, 0), new TaskId(stageId3, 1)));
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId, dynamicFilterId2, dynamicFilterId3), ImmutableSet.of(dynamicFilterId, dynamicFilterId2, dynamicFilterId3), ImmutableSet.of());
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression), new DynamicFilters.Descriptor(dynamicFilterId2, expression2), new DynamicFilters.Descriptor(dynamicFilterId3, expression3)), ImmutableMap.of(Symbol.from(expression), new TestingColumnHandle("probeColumnA"), Symbol.from(expression2), new TestingColumnHandle("probeColumnA"), Symbol.from(expression3), new TestingColumnHandle("probeColumnB")));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 0);
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 0), Domain.singleValue(IntegerType.INTEGER, 1L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked.isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 1), Domain.singleValue(IntegerType.INTEGER, 2L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 2);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 1);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        CompletableFuture isBlocked2 = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked2.isDone());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId2, new TaskId(stageId2, 0), Domain.singleValue(IntegerType.INTEGER, 2L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked2.isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 3);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 1);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId2, new TaskId(stageId2, 1), Domain.singleValue(IntegerType.INTEGER, 3L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        Assert.assertTrue(isBlocked2.isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 4);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 2);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        CompletableFuture isBlocked3 = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked3.isDone());
        DynamicFilter createDynamicFilter2 = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression), new DynamicFilters.Descriptor(dynamicFilterId2, expression2)), ImmutableMap.of(Symbol.from(expression), new TestingColumnHandle("probeColumnA"), Symbol.from(expression2), new TestingColumnHandle("probeColumnA")));
        Assert.assertTrue(createDynamicFilter2.isComplete());
        Assert.assertFalse(createDynamicFilter2.isAwaitable());
        Assert.assertTrue(createDynamicFilter2.isBlocked().isDone());
        Assert.assertEquals(createDynamicFilter2.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId3, new TaskId(stageId3, 0), Domain.none(IntegerType.INTEGER));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked3.isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 5);
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 2);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId3, new TaskId(stageId3, 1), Domain.none(IntegerType.INTEGER));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.none());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 6);
        Assert.assertTrue(isBlocked3.isDone());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 3);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 0);
        Assert.assertEquals(ImmutableSet.copyOf(dynamicFilteringStats2.getDynamicFilterDomainStats()), ImmutableSet.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, getExpectedDomainString(1L, 2L), 2, 0), new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId2, getExpectedDomainString(2L, 3L), 2, 0), new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId3, Domain.none(IntegerType.INTEGER).toString(session.toConnectorSession()), 0, 0)));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 6);
    }

    @Test
    public void testShortCircuitOnAllTupleDomain() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        Expression expression = PlanBuilder.expression("DF_SYMBOL1");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.RUNNING);
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0), new TaskId(stageId, 1)));
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression)), ImmutableMap.of(Symbol.from(expression), new TestingColumnHandle("probeColumnA")));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 1), Domain.all(IntegerType.INTEGER));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
    }

    @Test
    public void testReplicatedDynamicFilter() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        Expression expression = PlanBuilder.expression("DF_SYMBOL1");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.SCHEDULING);
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0), new TaskId(stageId, 1)));
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(), ImmutableSet.of(dynamicFilterId));
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression)), ImmutableMap.of(Symbol.from(expression), new TestingColumnHandle("probeColumnA")));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 0);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 0), Domain.singleValue(IntegerType.INTEGER, 1L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 1L))));
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 0);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFilterDomainStats(), ImmutableList.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L).toString(session.toConnectorSession()), 1, 0)));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(testDynamicFiltersStageSupplier.getRequestCount(), 1);
    }

    @Test
    public void testDynamicFilterCancellation() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df");
        Expression expression = PlanBuilder.expression("DF_SYMBOL1");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 0);
        List<TaskId> of = ImmutableList.of(new TaskId(stageId, 0), new TaskId(stageId, 1));
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.RUNNING);
        testDynamicFiltersStageSupplier.addTasks(of);
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression)), ImmutableMap.of(Symbol.from(expression), testingColumnHandle));
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.all());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 0), Domain.singleValue(IntegerType.INTEGER, 1L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.all());
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        Assert.assertFalse(isBlocked.cancel(false));
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        Assert.assertFalse(createDynamicFilter.isComplete());
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 1), Domain.singleValue(IntegerType.INTEGER, 2L));
        dynamicFilterService.collectDynamicFilters();
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
    }

    @Test
    public void testIsAwaitable() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        SymbolReference symbolReference = new Symbol("symbol").toSymbolReference();
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.SCHEDULING);
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0)));
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(Symbol.from(symbolReference), testingColumnHandle));
        DynamicFilter createDynamicFilter2 = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId2, symbolReference)), ImmutableMap.of(Symbol.from(symbolReference), testingColumnHandle));
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(createDynamicFilter2.isAwaitable());
    }

    @Test
    public void testMultipleColumnMapping() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        Expression expression = PlanBuilder.expression("DF_SYMBOL1");
        Expression expression2 = PlanBuilder.expression("DF_SYMBOL2");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        TestDynamicFiltersStageSupplier testDynamicFiltersStageSupplier = new TestDynamicFiltersStageSupplier(StageState.SCHEDULED);
        testDynamicFiltersStageSupplier.addTasks(ImmutableList.of(new TaskId(stageId, 0)));
        dynamicFilterService.registerQuery(queryId, testDynamicFiltersStageSupplier, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        TestingColumnHandle testingColumnHandle2 = new TestingColumnHandle("probeColumnB");
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, expression), new DynamicFilters.Descriptor(dynamicFilterId, expression2)), ImmutableMap.of(Symbol.from(expression), testingColumnHandle, Symbol.from(expression2), testingColumnHandle2));
        Domain singleValue = Domain.singleValue(IntegerType.INTEGER, 1L);
        testDynamicFiltersStageSupplier.storeSummary(dynamicFilterId, new TaskId(stageId, 0), singleValue);
        dynamicFilterService.collectDynamicFilters();
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, singleValue, testingColumnHandle2, singleValue)));
    }

    @Test
    public void testSourceStageInnerLazyDynamicFilters() {
        DynamicFilterId dynamicFilterId = new DynamicFilterId("filterId");
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.SOURCE_DISTRIBUTION, ExchangeNode.Type.REPLICATE)), ImmutableSet.of(dynamicFilterId));
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, ExchangeNode.Type.REPLICATE)), ImmutableSet.of());
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.SOURCE_DISTRIBUTION, ExchangeNode.Type.REPARTITION)), ImmutableSet.of());
    }

    private static PlanFragment createPlan(DynamicFilterId dynamicFilterId, PartitioningHandle partitioningHandle, ExchangeNode.Type type) {
        Symbol symbol = new Symbol("column");
        Symbol symbol2 = new Symbol("buildColumn");
        PlanNodeId planNodeId = new PlanNodeId("plan_id");
        TableScanNode newInstance = TableScanNode.newInstance(planNodeId, TestingHandles.TEST_TABLE_HANDLE, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")));
        FilterNode filterNode = new FilterNode(new PlanNodeId("filter_node_id"), newInstance, DynamicFilters.createDynamicFilterExpression(MetadataManager.createTestMetadataManager(), dynamicFilterId, VarcharType.VARCHAR, symbol.toSymbolReference()));
        RemoteSourceNode remoteSourceNode = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(symbol2), Optional.empty(), type);
        return new PlanFragment(new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), JoinNode.Type.INNER, filterNode, remoteSourceNode, ImmutableList.of(), newInstance.getOutputSymbols(), remoteSourceNode.getOutputSymbols(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableMap.of(dynamicFilterId, symbol2), Optional.empty()), ImmutableMap.of(symbol, VarcharType.VARCHAR), partitioningHandle, ImmutableList.of(planNodeId), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)), StageExecutionDescriptor.ungroupedExecution(), StatsAndCosts.empty(), Optional.empty());
    }

    private static String getExpectedDomainString(long j, long j2) {
        return Domain.create(ValueSet.ofRanges(Range.range(IntegerType.INTEGER, Long.valueOf(j), true, Long.valueOf(j2), true), new Range[0]), false).toString(session.toConnectorSession());
    }
}
