package io.trino.plugin.deltalake;

import io.trino.execution.FailureInjector;
import io.trino.operator.RetryPolicy;
import io.trino.plugin.deltalake.DeltaLakeQueryRunner;
import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin;
import io.trino.plugin.exchange.filesystem.containers.MinioStorage;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.spi.ErrorType;
import io.trino.testing.BaseFailureRecoveryTest;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import io.trino.tpch.TpchTable;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/trino/plugin/deltalake/BaseDeltaFailureRecoveryTest.class */
public abstract class BaseDeltaFailureRecoveryTest extends BaseFailureRecoveryTest {
    private final String bucketName;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseDeltaFailureRecoveryTest(RetryPolicy retryPolicy) {
        super(retryPolicy);
        this.bucketName = "test-delta-lake-" + retryPolicy.name().toLowerCase(Locale.ENGLISH) + "-failure-recovery-" + TestingNames.randomNameSuffix();
    }

    protected QueryRunner createQueryRunner(List<TpchTable<?>> list, Map<String, String> map, Map<String, String> map2) throws Exception {
        HiveMinioDataLake closeAfterClass = closeAfterClass(new HiveMinioDataLake(this.bucketName));
        closeAfterClass.start();
        MinioStorage closeAfterClass2 = closeAfterClass(new MinioStorage("test-exchange-spooling-" + TestingNames.randomNameSuffix()));
        closeAfterClass2.start();
        return ((DeltaLakeQueryRunner.Builder) ((DeltaLakeQueryRunner.Builder) ((DeltaLakeQueryRunner.Builder) DeltaLakeQueryRunner.builder().setCoordinatorProperties(map2)).addExtraProperties(map)).setAdditionalSetup(queryRunner -> {
            queryRunner.installPlugin(new FileSystemExchangePlugin());
            queryRunner.loadExchangeManager("filesystem", MinioStorage.getExchangeManagerProperties(closeAfterClass2));
        })).addMetastoreProperties(closeAfterClass.getHiveHadoop()).addS3Properties(closeAfterClass.getMinio(), this.bucketName).addDeltaProperty("delta.enable-non-concurrent-writes", "true").setInitialTables(list).build();
    }

    protected boolean areWriteRetriesSupported() {
        return true;
    }

    @Test
    protected void testDelete() {
        Optional of = Optional.of("CREATE TABLE <table> AS SELECT * FROM orders");
        Optional of2 = Optional.of("DROP TABLE <table>");
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(boundaryCoordinatorStage()).failsAlways(abstractThrowableAssert -> {
            abstractThrowableAssert.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(rootStage()).failsAlways(abstractThrowableAssert2 -> {
            abstractThrowableAssert2.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(leafStage()).failsWithoutRetries(abstractThrowableAssert3 -> {
            abstractThrowableAssert3.hasMessageContaining("This error is injected by the failure injection service");
        }).finishesSuccessfully();
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert4 -> {
            abstractThrowableAssert4.hasMessageContaining("This error is injected by the failure injection service");
        }).finishesSuccessfully();
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(intermediateDistributedStage()).failsWithoutRetries(abstractThrowableAssert5 -> {
            abstractThrowableAssert5.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_FAILURE).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert6 -> {
            abstractThrowableAssert6.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500");
        }).finishesSuccessfully();
        assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_TIMEOUT).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert7 -> {
            abstractThrowableAssert7.hasMessageContaining("Encountered too many errors talking to a worker node");
        }).finishesSuccessfully();
        if (getRetryPolicy() == RetryPolicy.QUERY) {
            assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_FAILURE).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert8 -> {
                abstractThrowableAssert8.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500");
            }).finishesSuccessfully();
            assertThatQuery("DELETE FROM <table> WHERE orderkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_TIMEOUT).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert9 -> {
                abstractThrowableAssert9.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer");
            }).finishesSuccessfully();
        }
    }

    @Test
    protected void testUpdate() {
        Optional of = Optional.of("CREATE TABLE <table> AS SELECT * FROM orders");
        Optional of2 = Optional.of("DROP TABLE <table>");
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(boundaryCoordinatorStage()).failsAlways(abstractThrowableAssert -> {
            abstractThrowableAssert.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(rootStage()).failsAlways(abstractThrowableAssert2 -> {
            abstractThrowableAssert2.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(leafStage()).failsWithoutRetries(abstractThrowableAssert3 -> {
            abstractThrowableAssert3.hasMessageContaining("This error is injected by the failure injection service");
        }).finishesSuccessfully();
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert4 -> {
            abstractThrowableAssert4.hasMessageContaining("This error is injected by the failure injection service");
        }).finishesSuccessfully();
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)).at(intermediateDistributedStage()).failsWithoutRetries(abstractThrowableAssert5 -> {
            abstractThrowableAssert5.hasMessageContaining("This error is injected by the failure injection service");
        });
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_FAILURE).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert6 -> {
            abstractThrowableAssert6.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500");
        }).finishesSuccessfully();
        assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_TIMEOUT).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert7 -> {
            abstractThrowableAssert7.hasMessageContaining("Encountered too many errors talking to a worker node");
        }).finishesSuccessfully();
        if (getRetryPolicy() == RetryPolicy.QUERY) {
            assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_FAILURE).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert8 -> {
                abstractThrowableAssert8.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500");
            }).finishesSuccessfully();
            assertThatQuery("UPDATE <table> SET shippriority = 101 WHERE custkey = 1").withSetupQuery(of).withCleanupQuery(of2).experiencing(FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_TIMEOUT).at(boundaryDistributedStage()).failsWithoutRetries(abstractThrowableAssert9 -> {
                abstractThrowableAssert9.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer");
            }).finishesSuccessfully();
        }
    }

    @Test
    protected void testRefreshMaterializedView() {
        Assertions.assertThatThrownBy(() -> {
            super.testRefreshMaterializedView();
        }).hasMessageContaining("This connector does not support creating materialized views");
    }

    @Test
    protected void testCreatePartitionedTable() {
        testTableModification(Optional.empty(), "CREATE TABLE <table> WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders", Optional.of("DROP TABLE <table>"));
    }

    @Test
    protected void testInsertIntoNewPartition() {
        testTableModification(Optional.of("CREATE TABLE <table> WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"), "INSERT INTO <table> SELECT *, 'partition2' p FROM orders", Optional.of("DROP TABLE <table>"));
    }

    @Test
    protected void testInsertIntoExistingPartition() {
        testTableModification(Optional.of("CREATE TABLE <table> WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"), "INSERT INTO <table> SELECT *, 'partition1' p FROM orders", Optional.of("DROP TABLE <table>"));
    }
}
