package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.reflect.ClassPath;
import io.airlift.concurrent.MoreFutures;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.class */
public class TestDeltaLakeLocalConcurrentWritesTest extends AbstractTestQueryFramework {
    protected QueryRunner createQueryRunner() throws Exception {
        return DeltaLakeQueryRunner.builder().addDeltaProperty("delta.unique-table-location", "true").addDeltaProperty("delta.register-table-procedure.enabled", "true").build();
    }

    @Test
    public void testConcurrentInsertsReconciliationForBlindInserts() throws Exception {
        testConcurrentInsertsReconciliationForBlindInserts(false);
        testConcurrentInsertsReconciliationForBlindInserts(true);
    }

    private void testConcurrentInsertsReconciliationForBlindInserts(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a INT, part INT) " + (z ? " WITH (partitioned_by = ARRAY['part'])" : ""));
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (1, 10)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (11, 20)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (1, 10), (11, 20), (21, 30)");
            assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, true),\n    (3, 'WRITE', 'WriteSerializable', 2, true)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromTheSameTable() throws Exception {
        testConcurrentInsertsSelectingFromTheSameTable(true);
        testConcurrentInsertsSelectingFromTheSameTable(false);
    }

    private void testConcurrentInsertsSelectingFromTheSameTable(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_select_from_same_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) " + (z ? " WITH (partitioned_by = ARRAY['part'])" : "") + "  AS VALUES (0, 10)", 1L);
        try {
            long count = ((List) IntStream.range(0, 3).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " SELECT COUNT(*), 10 AS part FROM " + str);
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            String str2 = "(%d, 10)";
            assertQuery("SELECT * FROM " + str, "VALUES (0, 10)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(obj -> {
                return "(%d, 10)".formatted(obj);
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + str + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(l -> {
                return "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(l, Long.valueOf(l.longValue() - 1));
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromTheSameVersionedTable() throws Exception {
        testConcurrentInsertsSelectingFromTheSameVersionedTable(true);
        testConcurrentInsertsSelectingFromTheSameVersionedTable(false);
    }

    private void testConcurrentInsertsSelectingFromTheSameVersionedTable(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_select_from_same_versioned_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) " + (z ? " WITH (partitioned_by = ARRAY['part'])" : "") + "  AS VALUES (0, 'a')", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT 1, 'b' AS part FROM " + str + " FOR VERSION AS OF 0");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT 2, 'c' AS part FROM " + str + " FOR VERSION AS OF 0");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT 3, 'd' AS part FROM " + str + " FOR VERSION AS OF 0");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            assertQuery("SELECT * FROM " + str, "VALUES (0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')");
            assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, true),\n    (3, 'WRITE', 'WriteSerializable', 2, true)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromTheSamePartition() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_select_from_same_partition_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            long count = ((List) IntStream.range(0, 3).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " SELECT COUNT(*) as a, 10 as part FROM " + str + " WHERE part = 10");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            String str2 = "(%d, 10)";
            assertQuery("SELECT * FROM " + str, "VALUES (0, 10), (11, 20), (22, 30)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(obj -> {
                return "(%d, 10)".formatted(obj);
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + str + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(l -> {
                return "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(l, Long.valueOf(l.longValue() - 1));
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsReconciliationForMixedInserts() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT COUNT(*) AS a, 10 AS part FROM " + str + " WHERE part = 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT COUNT(*) AS a, 20 AS part FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (22, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            assertQuery("SELECT * FROM " + str, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', true)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1,  40 as part FROM " + str + " WHERE part = 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 2,  40 as part FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 3,  40 as part FROM " + str + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            assertQuery("SELECT * FROM " + str, "VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOfSameTable() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_mixed_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30), (33, 40), (44, 50), (55, 60)", 6L);
        assertUpdate("INSERT INTO " + str + " VALUES (2, 10), (13, 20), (24, 30), (35, 40), (46, 50), (57, 60)", 6L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part IN (10, 20)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part IN (30, 40)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part IN (50, 60)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            assertQuery("SELECT * FROM " + str, "VALUES\n    (0, 10), (1, 10), (2, 10), (3, 10),\n    (11, 20), (12, 20), (13, 20), (14, 20),\n    (22, 30), (23, 30),(24, 30), (25, 30),\n    (33, 40), (34, 40), (35, 40), (36, 40),\n    (44, 50), (45, 50), (46, 50), (47, 50),\n    (55, 60), (56,60), (57, 60), (58,60)\n");
            assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true),\n    (1, 'WRITE', 'WriteSerializable', 0, true),\n    (2, 'WRITE', 'WriteSerializable', 1, false),\n    (3, 'WRITE', 'WriteSerializable', 2, false),\n    (4, 'WRITE', 'WriteSerializable', 3, false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentSerializableBlindInsertsReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_serializable_blind_inserts_table_reconciliation" + TestingNames.randomNameSuffix();
        registerTableFromResources(str, "deltalake/serializable_partitioned_table", getQueryRunner());
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (0, 10), (33, 40)");
        try {
            Assertions.assertThat(((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " VALUES (1, 10)");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count()).isGreaterThanOrEqualTo(1L);
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsReconciliationFailure() throws Exception {
        testConcurrentInsertsReconciliationFailure(false);
        testConcurrentInsertsReconciliationFailure(true);
    }

    private void testConcurrentInsertsReconciliationFailure(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)" + (z ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (1, 10)", 1L);
        try {
            Assertions.assertThat(((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " SELECT * FROM " + str + " WHERE part = 10");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count()).isGreaterThanOrEqualTo(1L);
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeletePushdownReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (31, 40)");
            assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'DELETE', 'WriteSerializable', false),\n    (2, 'DELETE', 'WriteSerializable', false),\n    (3, 'DELETE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeletePushdownFromTheSamePartition() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_from_same_partition_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            long count = ((List) IntStream.range(0, 3).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("DELETE FROM " + str + "  WHERE part = 10");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            assertQuery("SELECT * FROM " + str, "VALUES (11, 20), (22, 30)");
            String str2 = "(%s, 'DELETE', 'WriteSerializable', false)";
            assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(obj -> {
                return "(%s, 'DELETE', 'WriteSerializable', false)".formatted(obj);
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentTruncateReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_from_same_partition_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part) WITH (partitioned_by = ARRAY['part']) AS VALUES (0, 10), (11, 20), (22, 30)", 3L);
        try {
            long count = ((List) IntStream.range(0, 3).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("TRUNCATE TABLE " + str);
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).returnsEmptyResult();
            String str2 = "(%s, 'TRUNCATE', 'WriteSerializable', false)";
            assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true)" + ((String) LongStream.rangeClosed(1L, count).boxed().map(obj -> {
                return "(%s, 'TRUNCATE', 'WriteSerializable', false)".formatted(obj);
            }).collect(Collectors.joining(", ", ", ", ""))));
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeletePushdownAndBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str);
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (31, 40)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(((Long) computeActual("SELECT sum(a) FROM " + str).getOnlyValue()).longValue()).isIn(new Object[]{0L, 21L, 31L, 52L});
            assertQuery("SELECT operation, isolation_level FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('DELETE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentTruncateAndBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_truncate_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("TRUNCATE TABLE " + str);
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (31, 40)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(((Long) computeActual("SELECT sum(a) FROM " + str).getOnlyValue()).longValue()).isIn(new Object[]{0L, 21L, 31L, 52L, 64L});
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('TRUNCATE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeletePushdownAndNonBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        assertUpdate("INSERT INTO " + str + " VALUES (2, 10)", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (11, 20), (12, 20)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('DELETE', 'WriteSerializable', false),\n    ('DELETE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentSerializableDeletesPushdownReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_serializable_delete_reconciliation" + TestingNames.randomNameSuffix();
        registerTableFromResources(str, "deltalake/serializable_partitioned_table", getQueryRunner());
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (0, 10), (33, 40)");
        try {
            Assertions.assertThat(((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count()).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (33, 40)");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentSerializableTruncateReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_serializable_delete_reconciliation" + TestingNames.randomNameSuffix();
        registerTableFromResources(str, "deltalake/serializable_partitioned_table", getQueryRunner());
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (0, 10), (33, 40)");
        try {
            Assertions.assertThat(((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("TRUNCATE TABLE " + str);
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count()).isGreaterThanOrEqualTo(1L);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).returnsEmptyResult();
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentUpdateReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_updates_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("UPDATE " + str + " SET a = a + 1 WHERE part = 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("UPDATE " + str + " SET a = a + 1  WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("UPDATE " + str + " SET a = a + 1  WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (2, 10), (12, 20), (22, 30), (31, 40)");
            assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'MERGE', 'WriteSerializable', false),\n    (2, 'MERGE', 'WriteSerializable', false),\n    (3, 'MERGE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeleteReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_deletes_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        assertUpdate("INSERT INTO " + str + " VALUES (2, 10)", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 20 AND a = 11");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 30 AND a = 21");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (31, 40)");
            assertQuery("SELECT version, operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    (1, 'WRITE', 'WriteSerializable', true),\n    (2, 'MERGE', 'WriteSerializable', false),\n    (3, 'MERGE', 'WriteSerializable', false),\n    (4, 'MERGE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentMergeReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_merges_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30), (31, 40)", 4L);
        assertUpdate("INSERT INTO " + str + " VALUES (22, 30)", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("MERGE INTO %s t USING (VALUES (12, 20)) AS s(a, part)\n  ON (FALSE)\n    WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part)\n".formatted(str));
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("MERGE INTO %s t USING (VALUES (21, 30)) AS s(a, part)\n  ON (t.part = s.part)\n    WHEN MATCHED THEN DELETE\n".formatted(str));
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("MERGE INTO %s t USING (VALUES (32, 40)) AS s(a, part)\n  ON (t.part = s.part)\n    WHEN MATCHED THEN UPDATE SET a = s.a\n".formatted(str));
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (1, 10), (11, 20), (12, 20), (32, 40)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('MERGE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentSerializableMergeReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_serializable_merge_reconciliation" + TestingNames.randomNameSuffix();
        registerTableFromResources(str, "deltalake/serializable_partitioned_table", getQueryRunner());
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (0, 10), (33, 40)");
        try {
            Assertions.assertThat(((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("MERGE INTO %s t USING (VALUES (12, 20)) AS s(a, part)\n  ON (FALSE)\n    WHEN NOT MATCHED THEN INSERT (a, part) VALUES(s.a, s.part)\n".formatted(str));
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(MoreFutures::getFutureValue).filter(bool -> {
                return bool.booleanValue();
            }).count()).isGreaterThanOrEqualTo(1L);
            Assertions.assertThat(((Long) computeScalar("SELECT count(*) FROM " + str + " WHERE part = 20")).longValue()).isGreaterThanOrEqualTo(1L);
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeleteAndNonBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        assertUpdate("INSERT INTO " + str + " VALUES (2, 10)", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 30 AND a BETWEEN 20 AND 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (11, 20), (12, 20)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('MERGE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeleteAndBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE a > 10");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (8, 10)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(((Long) computeActual("SELECT sum(a) FROM " + str).getOnlyValue()).longValue()).isIn(new Object[]{1L, 9L, 22L, 30L});
            assertQuery("SELECT operation, isolation_level FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentUpdateAndBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_update_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("UPDATE " + str + " SET a = a + 1");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (13, 20)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(((Long) computeActual("SELECT sum(a) FROM " + str).getOnlyValue()).longValue()).isIn(new Object[]{48L, 49L, 49L, 50L});
            assertQuery("SELECT operation, isolation_level FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentMergeAndBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_merge_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20)", 2L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("MERGE INTO %s t USING (VALUES (11, 20), (8, 10), (21, 30)) AS s(a, part)\n  ON (t.a = s.a AND t.part = s.part)\n    WHEN MATCHED THEN DELETE\n".formatted(str));
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (8, 10)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            Assertions.assertThat(((Long) computeActual("SELECT sum(a) FROM " + str).getOnlyValue()).longValue()).isIn(new Object[]{1L, 9L, 22L, 30L});
            assertQuery("SELECT operation, isolation_level FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable'),\n    ('MERGE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable'),\n    ('WRITE', 'WriteSerializable')\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    @Test
    public void testConcurrentDeleteAndDeletePushdownAndNonBlindInsertsReconciliation() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_delete_and_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)  WITH (partitioned_by = ARRAY['part']) AS VALUES (1, 10), (11, 20), (21, 30)", 3L);
        assertUpdate("INSERT INTO " + str + " VALUES (2, 10)", 1L);
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 10 AND a IN (1, 2)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " SELECT a + 1, part FROM " + str + " WHERE part = 20");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("DELETE FROM " + str + " WHERE part = 30");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (11, 20), (12, 20)");
            assertQuery("SELECT operation, isolation_level, is_blind_append FROM \"" + str + "$history\"", "VALUES\n    ('CREATE TABLE AS SELECT', 'WriteSerializable', true),\n    ('WRITE', 'WriteSerializable', true),\n    ('MERGE', 'WriteSerializable', false),\n    ('DELETE', 'WriteSerializable', false),\n    ('WRITE', 'WriteSerializable', false)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            Assertions.assertThat(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
            throw th;
        }
    }

    protected void registerTableFromResources(String str, String str2, QueryRunner queryRunner) throws IOException {
        TrinoFileSystem create = ((TrinoFileSystemFactory) TestingDeltaLakeUtils.getConnectorService(queryRunner, TrinoFileSystemFactory.class)).create(ConnectorIdentity.ofUser("test"));
        String str3 = "local:///" + str;
        create.createDirectory(Location.of(str3));
        try {
            for (ClassPath.ResourceInfo resourceInfo : (List) ClassPath.from(getClass().getClassLoader()).getResources().stream().filter(resourceInfo2 -> {
                return resourceInfo2.getResourceName().startsWith(str2 + "/");
            }).collect(ImmutableList.toImmutableList())) {
                create.newOutputFile(Location.of(resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(str2), Matcher.quoteReplacement(str3)))).createOrOverwrite(resourceInfo.asByteSource().read());
            }
            queryRunner.execute(String.format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", str, str3));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
