package io.trino.plugin.deltalake;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.testing.Assertions;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.QueryStats;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.security.AllowAllAccessControl;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.split.SplitSource;
import io.trino.sql.planner.OptimizerConfig;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import io.trino.tpch.TpchTable;
import io.trino.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Isolated;

@Isolated
/* loaded from: input_file:io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering.class */
public class TestDeltaLakeDynamicFiltering extends AbstractTestQueryFramework {
    private final String bucketName = "delta-lake-test-dynamic-filtering-" + TestingNames.randomNameSuffix();
    private HiveMinioDataLake hiveMinioDataLake;

    /* loaded from: input_file:io/trino/plugin/deltalake/TestDeltaLakeDynamicFiltering$BlockedDynamicFilter.class */
    private static class BlockedDynamicFilter implements DynamicFilter {
        private final CompletableFuture<?> isBlocked;

        public BlockedDynamicFilter(CompletableFuture<?> completableFuture) {
            this.isBlocked = (CompletableFuture) Objects.requireNonNull(completableFuture, "isBlocked is null");
        }

        public Set<ColumnHandle> getColumnsCovered() {
            return ImmutableSet.of();
        }

        public CompletableFuture<?> isBlocked() {
            return this.isBlocked;
        }

        public boolean isComplete() {
            return false;
        }

        public boolean isAwaitable() {
            return true;
        }

        public TupleDomain<ColumnHandle> getCurrentPredicate() {
            return TupleDomain.all();
        }
    }

    protected QueryRunner createQueryRunner() throws Exception {
        Verify.verify(new DynamicFilterConfig().isEnableDynamicFiltering(), "this class assumes dynamic filtering is enabled by default", new Object[0]);
        this.hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(this.bucketName));
        this.hiveMinioDataLake.start();
        DistributedQueryRunner build = DeltaLakeQueryRunner.builder().addMetastoreProperties(this.hiveMinioDataLake.getHiveHadoop()).addS3Properties(this.hiveMinioDataLake.getMinio(), this.bucketName).addDeltaProperty("delta.register-table-procedure.enabled", "true").build();
        ImmutableList.of(TpchTable.LINE_ITEM, TpchTable.ORDERS).forEach(tpchTable -> {
            String tableName = tpchTable.getTableName();
            this.hiveMinioDataLake.copyResources("io/trino/plugin/deltalake/testing/resources/databricks73/" + tableName, tableName);
            build.execute(String.format("CALL system.register_table(CURRENT_SCHEMA, '%1$s', 's3://%2$s/%1$s')", tableName, this.bucketName));
        });
        return build;
    }

    @Timeout(60)
    @Test
    public void testDynamicFiltering() {
        for (OptimizerConfig.JoinDistributionType joinDistributionType : OptimizerConfig.JoinDistributionType.values()) {
            QueryRunner.MaterializedResultWithPlan executeWithPlan = getDistributedQueryRunner().executeWithPlan(sessionWithDynamicFiltering(true, joinDistributionType), "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice > 59995 AND orders.totalprice < 60000");
            QueryRunner.MaterializedResultWithPlan executeWithPlan2 = getDistributedQueryRunner().executeWithPlan(sessionWithDynamicFiltering(false, joinDistributionType), "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice > 59995 AND orders.totalprice < 60000");
            Assertions.assertEqualsIgnoreOrder(executeWithPlan.result().getMaterializedRows(), executeWithPlan2.result().getMaterializedRows());
            Assertions.assertGreaterThan(Long.valueOf(getQueryStats(executeWithPlan2.queryId()).getPhysicalInputPositions()), Long.valueOf(getQueryStats(executeWithPlan.queryId()).getPhysicalInputPositions()));
        }
    }

    @Timeout(30)
    @Test
    public void testIncompleteDynamicFilterTimeout() throws Exception {
        String str = (String) getSession().getSchema().orElseThrow();
        QueryRunner queryRunner = getQueryRunner();
        TransactionManager transactionManager = queryRunner.getTransactionManager();
        Session beginTransactionId = Session.builder(getSession()).setCatalogSessionProperty(DeltaLakeQueryRunner.DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1s").build().beginTransactionId(transactionManager.beginTransaction(true), transactionManager, new AllowAllAccessControl());
        TableHandle tableHandle = (TableHandle) queryRunner.getPlannerContext().getMetadata().getTableHandle(beginTransactionId, new QualifiedObjectName(DeltaLakeQueryRunner.DELTA_CATALOG, str, "orders")).orElseThrow();
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            SplitSource splits = queryRunner.getSplitManager().getSplits(beginTransactionId, Span.getInvalid(), tableHandle, new BlockedDynamicFilter(completableFuture), Constraint.alwaysTrue());
            ArrayList arrayList = new ArrayList();
            while (!splits.isFinished()) {
                arrayList.addAll(((SplitSource.SplitBatch) splits.getNextBatch(1000).get()).getSplits());
            }
            splits.close();
            org.assertj.core.api.Assertions.assertThat(arrayList.isEmpty()).isFalse();
            completableFuture.complete(null);
        } catch (Throwable th) {
            completableFuture.complete(null);
            throw th;
        }
    }

    private Session sessionWithDynamicFiltering(boolean z, OptimizerConfig.JoinDistributionType joinDistributionType) {
        return Session.builder(noJoinReordering(joinDistributionType)).setSystemProperty("enable_dynamic_filtering", String.valueOf(z)).setCatalogSessionProperty(DeltaLakeQueryRunner.DELTA_CATALOG, "dynamic_filtering_wait_timeout", "1h").build();
    }

    private QueryStats getQueryStats(QueryId queryId) {
        return getDistributedQueryRunner().getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
    }
}
