package org.neo4j.driver.integration.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.hamcrest.Matchers;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Record;
import org.neo4j.driver.Statement;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxStatementResult;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.summary.StatementType;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.neo4j.driver.util.TestUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@EnabledOnNeo4jWith(Neo4jFeature.BOLT_V4)
@ParallelizableIT
/* loaded from: input_file:org/neo4j/driver/integration/reactive/RxTransactionIT.class */
class RxTransactionIT {

    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();
    private RxSession session;

    RxTransactionIT() {
    }

    @BeforeEach
    void setUp() {
        this.session = neo4j.driver().rxSession();
    }

    @Test
    void shouldBePossibleToCommitEmptyTx() {
        String lastBookmark = this.session.lastBookmark();
        StepVerifier.create(Mono.from(this.session.beginTransaction()).flatMap(rxTransaction -> {
            return Mono.from(rxTransaction.commit());
        })).verifyComplete();
        String lastBookmark2 = this.session.lastBookmark();
        Assertions.assertNotNull(lastBookmark2);
        Assertions.assertNotEquals(lastBookmark, lastBookmark2);
    }

    @Test
    void shouldBePossibleToRollbackEmptyTx() {
        String lastBookmark = this.session.lastBookmark();
        StepVerifier.create(Mono.from(this.session.beginTransaction()).flatMap(rxTransaction -> {
            return Mono.from(rxTransaction.rollback());
        })).verifyComplete();
        Assertions.assertEquals(lastBookmark, this.session.lastBookmark());
    }

    @Test
    void shouldBePossibleToRunSingleStatementAndCommit() {
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run("CREATE (n:Node {id: 42}) RETURN n").records()).map(record -> {
                return Integer.valueOf(record.get(0).asNode().get("id").asInt());
            });
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).expectNext(42).verifyComplete();
        Assertions.assertEquals(1, countNodes(42));
    }

    @Test
    void shouldBePossibleToRunSingleStatementAndRollback() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanRollback(rxTransaction);
        Assertions.assertEquals(0, countNodes(4242));
    }

    @MethodSource({"commit"})
    @ParameterizedTest
    void shouldBePossibleToRunMultipleStatements(boolean z) {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        TestUtil.await(rxTransaction.run("CREATE (n:Node {id: 1})").records());
        TestUtil.await(rxTransaction.run("CREATE (n:Node {id: 2})").records());
        TestUtil.await(rxTransaction.run("CREATE (n:Node {id: 1})").records());
        assertCanCommitOrRollback(z, rxTransaction);
        verifyCommittedOrRolledBack(z);
    }

    @MethodSource({"commit"})
    @ParameterizedTest
    void shouldBePossibleToRunMultipleStatementsWithoutWaiting(boolean z) {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        TestUtil.await(Flux.from(rxTransaction.run("CREATE (n:Node {id: 1})").records()).concatWith(rxTransaction.run("CREATE (n:Node {id: 2})").records()).concatWith(rxTransaction.run("CREATE (n:Node {id: 1})").records()));
        assertCanCommitOrRollback(z, rxTransaction);
        verifyCommittedOrRolledBack(z);
    }

    @MethodSource({"commit"})
    @ParameterizedTest
    void shouldBePossibleToRunMultipleStatementsWithoutStreaming(boolean z) {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        TestUtil.await(Flux.from(rxTransaction.run("CREATE (n:Node {id: 1})").keys()).concatWith(rxTransaction.run("CREATE (n:Node {id: 2})").keys()).concatWith(rxTransaction.run("CREATE (n:Node {id: 1})").keys()));
        assertCanCommitOrRollback(z, rxTransaction);
        verifyCommittedOrRolledBack(z);
    }

    @Test
    void shouldFailToCommitAfterSingleWrongStatement() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        });
    }

    @Test
    void shouldAllowRollbackAfterSingleWrongStatement() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailToCommitAfterCoupleCorrectAndSingleWrongStatement() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanRunReturnOne(rxTransaction);
        assertFailToRunWrongStatement(rxTransaction);
        Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        });
    }

    @Test
    void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanRunReturnOne(rxTransaction);
        assertFailToRunWrongStatement(rxTransaction);
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldNotAllowNewStatementsAfterAnIncorrectStatement() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        RxStatementResult run = rxTransaction.run("CREATE ()");
        MatcherAssert.assertThat(((Exception) Assertions.assertThrows(Exception.class, () -> {
            TestUtil.await(run.records());
        })).getMessage(), Matchers.startsWith("Cannot run more statements in this transaction"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailBoBeginTxWithInvalidBookmark() {
        RxSession rxSession = neo4j.driver().rxSession(sessionParametersTemplate -> {
            sessionParametersTemplate.withBookmarks(new String[]{"InvalidBookmark"});
        });
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxSession.beginTransaction());
        }).getMessage(), Matchers.containsString("InvalidBookmark"));
    }

    @Test
    void shouldBePossibleToCommitWhenCommitted() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanCommit(rxTransaction);
        StepVerifier.create(Mono.from(rxTransaction.commit())).verifyComplete();
    }

    @Test
    void shouldBePossibleToRollbackWhenRolledBack() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanRollback(rxTransaction);
        StepVerifier.create(Mono.from(rxTransaction.rollback())).verifyComplete();
    }

    @Test
    void shouldFailToCommitWhenRolledBack() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanRollback(rxTransaction);
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        }).getMessage(), Matchers.containsString("transaction has been rolled back"));
    }

    @Test
    void shouldFailToRollbackWhenCommitted() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRunCreate(rxTransaction);
        assertCanCommit(rxTransaction);
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.rollback());
        }).getMessage(), Matchers.containsString("transaction has been committed"));
    }

    @Test
    void shouldAllowRollbackAfterFailedCommit() {
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run("WRONG").records());
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).verifyErrorSatisfies(th -> {
            MatcherAssert.assertThat(th.getMessage(), Matchers.containsString("Invalid input"));
        });
    }

    @Test
    void shouldExposeStatementKeysForColumnsWithAliases() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Assertions.assertEquals(Arrays.asList("one", "two", "three", "five"), TestUtil.await(rxTransaction.run("RETURN 1 AS one, 2 AS two, 3 AS three, 4 AS five").keys()));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldExposeStatementKeysForColumnsWithoutAliases() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Assertions.assertEquals(Arrays.asList("1", "2", "3", "5"), TestUtil.await(rxTransaction.run("RETURN 1, 2, 3, 5").keys()));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldExposeResultSummaryForSimpleQuery() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Value parameters = Values.parameters(new Object[]{"name1", "Bob", "name2", "John"});
        RxStatementResult run = rxTransaction.run("CREATE (p1:Person {name: $name1})-[:KNOWS]->(p2:Person {name: $name2}) RETURN p1, p2", parameters);
        TestUtil.await(run.records());
        ResultSummary resultSummary = (ResultSummary) TestUtil.await(Mono.from(run.summary()));
        Assertions.assertEquals(new Statement("CREATE (p1:Person {name: $name1})-[:KNOWS]->(p2:Person {name: $name2}) RETURN p1, p2", parameters), resultSummary.statement());
        Assertions.assertEquals(2, resultSummary.counters().nodesCreated());
        Assertions.assertEquals(2, resultSummary.counters().labelsAdded());
        Assertions.assertEquals(2, resultSummary.counters().propertiesSet());
        Assertions.assertEquals(1, resultSummary.counters().relationshipsCreated());
        Assertions.assertEquals(StatementType.READ_WRITE, resultSummary.statementType());
        Assertions.assertFalse(resultSummary.hasPlan());
        Assertions.assertFalse(resultSummary.hasProfile());
        Assertions.assertNull(resultSummary.plan());
        Assertions.assertNull(resultSummary.profile());
        Assertions.assertEquals(0, resultSummary.notifications().size());
        MatcherAssert.assertThat(resultSummary, org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldExposeResultSummaryForExplainQuery() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("EXPLAIN MATCH (n) RETURN n");
        TestUtil.await(run.records());
        ResultSummary resultSummary = (ResultSummary) TestUtil.await(Mono.from(run.summary()));
        Assertions.assertEquals(new Statement("EXPLAIN MATCH (n) RETURN n"), resultSummary.statement());
        Assertions.assertEquals(0, resultSummary.counters().nodesCreated());
        Assertions.assertEquals(0, resultSummary.counters().propertiesSet());
        Assertions.assertEquals(StatementType.READ_ONLY, resultSummary.statementType());
        Assertions.assertTrue(resultSummary.hasPlan());
        Assertions.assertFalse(resultSummary.hasProfile());
        Assertions.assertNotNull(resultSummary.plan());
        MatcherAssert.assertThat(resultSummary.plan().toString().toLowerCase(), Matchers.containsString("scan"));
        Assertions.assertNull(resultSummary.profile());
        Assertions.assertEquals(0, resultSummary.notifications().size());
        MatcherAssert.assertThat(resultSummary, org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldExposeResultSummaryForProfileQuery() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Value parameters = Values.parameters(new Object[]{"name", "Bob"});
        RxStatementResult run = rxTransaction.run("PROFILE MERGE (n {name: $name}) ON CREATE SET n.created = timestamp() ON MATCH SET n.counter = coalesce(n.counter, 0) + 1", parameters);
        TestUtil.await(run.records());
        ResultSummary resultSummary = (ResultSummary) TestUtil.await(Mono.from(run.summary()));
        Assertions.assertEquals(new Statement("PROFILE MERGE (n {name: $name}) ON CREATE SET n.created = timestamp() ON MATCH SET n.counter = coalesce(n.counter, 0) + 1", parameters), resultSummary.statement());
        Assertions.assertEquals(1, resultSummary.counters().nodesCreated());
        Assertions.assertEquals(2, resultSummary.counters().propertiesSet());
        Assertions.assertEquals(0, resultSummary.counters().relationshipsCreated());
        Assertions.assertEquals(StatementType.WRITE_ONLY, resultSummary.statementType());
        Assertions.assertTrue(resultSummary.hasPlan());
        Assertions.assertTrue(resultSummary.hasProfile());
        Assertions.assertNotNull(resultSummary.plan());
        Assertions.assertNotNull(resultSummary.profile());
        MatcherAssert.assertThat(resultSummary.profile().toString().toLowerCase(), Matchers.containsString("hits"));
        Assertions.assertEquals(0, resultSummary.notifications().size());
        MatcherAssert.assertThat(resultSummary, org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldCancelRecordStream() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        StepVerifier.create(Flux.from(rxTransaction.run("UNWIND ['a', 'b', 'c'] AS x RETURN x").records()).limitRate(1).take(1L).map(record -> {
            return record.get(0).asString();
        })).expectNext("a").verifyComplete();
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldForEachWithEmptyCursor() {
        testForEach("MATCH (n:SomeReallyStrangeLabel) RETURN n", 0);
    }

    @Test
    void shouldForEachWithNonEmptyCursor() {
        testForEach("UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555);
    }

    @Test
    void shouldFailForEachWhenActionFails() {
        RuntimeException runtimeException = new RuntimeException();
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run("RETURN 'Hi!'").records()).doOnNext(record -> {
                throw runtimeException;
            });
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).expectErrorSatisfies(th -> {
            Assertions.assertEquals(runtimeException, th);
        }).verify();
    }

    @Test
    void shouldConvertToListWithEmptyCursor() {
        testList("CREATE (:Person)-[:KNOWS]->(:Person)", Collections.emptyList());
    }

    @Test
    void shouldConvertToListWithNonEmptyCursor() {
        testList("UNWIND [1, '1', 2, '2', 3, '3'] AS x RETURN x", Arrays.asList(1L, "1", 2L, "2", 3L, "3"));
    }

    @Test
    void shouldConvertToTransformedListWithEmptyCursor() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Assertions.assertEquals(0, TestUtil.await(Flux.from(rxTransaction.run("CREATE ()").records()).map(record -> {
            return record.get(0).asMap();
        })).size());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldConvertToTransformedListWithNonEmptyCursor() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Assertions.assertEquals(Arrays.asList("a!", "b!", "c!"), TestUtil.await(Flux.from(rxTransaction.run("UNWIND ['a', 'b', 'c'] AS x RETURN x").records()).map(record -> {
            return record.get(0).asString() + "!";
        })));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailWhenListTransformationFunctionFails() {
        RuntimeException runtimeException = new RuntimeException();
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run("RETURN 'Hi!'").records()).map(record -> {
                throw runtimeException;
            });
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).expectErrorSatisfies(th -> {
            Assertions.assertEquals(runtimeException, th);
        }).verify();
    }

    @Test
    void shouldFailToCommitWhenServerIsRestarted() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("RETURN 1");
        Assertions.assertThrows(ServiceUnavailableException.class, () -> {
            TestUtil.await(Flux.from(run.records()).doOnSubscribe(subscription -> {
                neo4j.killDb();
            }));
            TestUtil.await(rxTransaction.commit());
        });
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailSingleWithEmptyCursor() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("MATCH (n:NoSuchLabel) RETURN n");
        MatcherAssert.assertThat(((NoSuchElementException) Assertions.assertThrows(NoSuchElementException.class, () -> {
        })).getMessage(), Matchers.containsString("Source was empty"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailSingleWithMultiRecordCursor() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("UNWIND ['a', 'b'] AS x RETURN x");
        MatcherAssert.assertThat(((IndexOutOfBoundsException) Assertions.assertThrows(IndexOutOfBoundsException.class, () -> {
        })).getMessage(), Matchers.startsWith("Source emitted more than one item"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldReturnSingleWithSingleRecordCursor() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Assertions.assertEquals("Hello!", ((Record) TestUtil.await(Flux.from(rxTransaction.run("RETURN 'Hello!'").records()).single())).get(0).asString());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldPropagateFailureFromFirstRecordInSingleAsync() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("UNWIND [0] AS x RETURN 10 / x");
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
        }).getMessage(), Matchers.containsString("/ by zero"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldPropagateFailureFromSecondRecordInSingleAsync() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("UNWIND [1, 0] AS x RETURN 10 / x");
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
        }).getMessage(), Matchers.containsString("/ by zero"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldConsumeEmptyCursor() {
        testConsume("MATCH (n:NoSuchLabel) RETURN n");
    }

    @Test
    void shouldConsumeNonEmptyCursor() {
        testConsume("RETURN 42");
    }

    @Test
    void shouldDoNothingWhenCommittedSecondTime() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanCommit(rxTransaction);
        Assertions.assertTrue(Mono.from(rxTransaction.commit()).toFuture().isDone());
    }

    @Test
    void shouldFailToCommitAfterRollback() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRollback(rxTransaction);
        Assertions.assertEquals("Can't commit, transaction has been rolled back", Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        }).getMessage());
    }

    @Test
    void shouldFailToCommitAfterTermination() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        }).getMessage(), Matchers.startsWith("Transaction can't be committed"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldDoNothingWhenRolledBackSecondTime() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanRollback(rxTransaction);
        Assertions.assertTrue(Mono.from(rxTransaction.rollback()).toFuture().isDone());
    }

    @Test
    void shouldFailToRollbackAfterCommit() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertCanCommit(rxTransaction);
        Assertions.assertEquals("Can't rollback, transaction has been committed", Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.rollback());
        }).getMessage());
    }

    @Test
    void shouldRollbackAfterTermination() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        assertCanRollback(rxTransaction);
    }

    @MethodSource({"commit"})
    @ParameterizedTest
    void shouldFailToRunQueryAfterCommit(boolean z) {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        TestUtil.await(rxTransaction.run("CREATE (:MyLabel)").records());
        assertCanCommitOrRollback(z, rxTransaction);
        Record record = (Record) TestUtil.await(Flux.from(this.session.run("MATCH (n:MyLabel) RETURN count(n)").records()).single());
        if (z) {
            Assertions.assertEquals(1, record.get(0).asInt());
        } else {
            Assertions.assertEquals(0, record.get(0).asInt());
        }
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.run("CREATE (:MyOtherLabel)").records());
        }).getMessage(), Matchers.containsString("Cannot run more statements in this transaction, it has been "));
    }

    @Test
    void shouldFailToRunQueryWhenTerminated() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        assertFailToRunWrongStatement(rxTransaction);
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.run("CREATE (:MyOtherLabel)").records());
        }).getMessage(), Matchers.startsWith("Cannot run more statements in this transaction"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldUpdateSessionBookmarkAfterCommit() {
        String lastBookmark = this.session.lastBookmark();
        TestUtil.await(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return rxTransaction.run("CREATE (:MyNode)").records();
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        }));
        String lastBookmark2 = this.session.lastBookmark();
        Assertions.assertNotNull(lastBookmark2);
        Assertions.assertNotEquals(lastBookmark, lastBookmark2);
    }

    @Test
    void shouldFailToCommitWhenQueriesFailAndErrorNotConsumed() throws InterruptedException {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        Flux concatWith = Flux.from(rxTransaction.run("CREATE (:TestNode)").records()).concatWith(rxTransaction.run("CREATE (:TestNode)").records()).concatWith(rxTransaction.run("RETURN 10 / 0").records()).concatWith(rxTransaction.run("CREATE (:TestNode)").records());
        Assertions.assertEquals("/ by zero", Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(concatWith);
        }).getMessage());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldNotRunUntilPublisherIsConnected() throws Throwable {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("RETURN 1");
        RxStatementResult run2 = rxTransaction.run("RETURN 2");
        StepVerifier.create(Flux.from(rxTransaction.run("RETURN 4").records()).concatWith(rxTransaction.run("RETURN 3").records()).concatWith(run2.records()).concatWith(run.records()).map(record -> {
            return Integer.valueOf(record.get(0).asInt());
        })).expectNext(4).expectNext(3).expectNext(2).expectNext(1).verifyComplete();
        assertCanRollback(rxTransaction);
    }

    @MethodSource({"commit"})
    @ParameterizedTest
    void shouldNotPropagateRunFailureIfNotExecuted(boolean z) {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        rxTransaction.run("RETURN ILLEGAL");
        assertCanCommitOrRollback(z, rxTransaction);
    }

    @Test
    void shouldPropagateRunFailureOnRecord() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("RETURN 42 / 0");
        TestUtil.await(run.keys());
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(run.records());
        }).getMessage(), Matchers.containsString("/ by zero"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldFailToCommitWhenPullAllFailureIsConsumed() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("FOREACH (value IN [1,2, 'aaa'] | CREATE (:Person {name: 10 / value}))");
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(run.records());
        }).code(), Matchers.containsString("TypeError"));
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(rxTransaction.commit());
        }).getMessage(), Matchers.startsWith("Transaction can't be committed"));
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldNotPropagateRunFailureFromSummary() {
        RxTransaction rxTransaction = (RxTransaction) TestUtil.await(Mono.from(this.session.beginTransaction()));
        RxStatementResult run = rxTransaction.run("RETURN Wrong");
        MatcherAssert.assertThat(Assertions.assertThrows(ClientException.class, () -> {
            TestUtil.await(run.records());
        }).code(), Matchers.containsString("SyntaxError"));
        TestUtil.await(run.summary());
        assertCanRollback(rxTransaction);
    }

    @Test
    void shouldHandleNestedQueries() throws Throwable {
        int i = 12555;
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run("UNWIND range(1, $size) AS x RETURN x", Collections.singletonMap("size", Integer.valueOf(i))).records()).limitRate(20).flatMap(record -> {
                return rxTransaction.run("CREATE (n:Node {id: $x}) RETURN n.id", Collections.singletonMap("x", Integer.valueOf(record.get("x").asInt()))).records();
            }).map(record2 -> {
                return Integer.valueOf(record2.get(0).asInt());
            });
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).expectNextCount(12555).verifyComplete();
    }

    private int countNodes(Object obj) {
        return ((Integer) TestUtil.await(Flux.from(this.session.run("MATCH (n:Node {id: $id}) RETURN count(n)", Values.parameters(new Object[]{"id", obj})).records()).single().map(record -> {
            return Integer.valueOf(record.get(0).asInt());
        }))).intValue();
    }

    private void testForEach(String str, int i) {
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            RxStatementResult run = rxTransaction.run(str);
            AtomicInteger atomicInteger = new AtomicInteger();
            return Flux.from(run.records()).doOnNext(record -> {
                atomicInteger.incrementAndGet();
            }).then(Mono.from(run.summary())).doOnSuccess(resultSummary -> {
                Assertions.assertNotNull(resultSummary);
                Assertions.assertEquals(str, resultSummary.statement().text());
                Assertions.assertEquals(Collections.emptyMap(), resultSummary.statement().parameters().asMap());
                Assertions.assertEquals(i, atomicInteger.get());
            });
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        })).expectNextCount(1L).verifyComplete();
    }

    private <T> void testList(String str, List<T> list) {
        ArrayList arrayList = new ArrayList();
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return Flux.from(rxTransaction.run(str).records()).collectList();
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        }).single()).consumeNextWith(list2 -> {
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                arrayList.add(((Record) it.next()).get(0).asObject());
            }
        }).verifyComplete();
        Assertions.assertEquals(list, arrayList);
    }

    private void testConsume(String str) {
        StepVerifier.create(Flux.usingWhen(this.session.beginTransaction(), rxTransaction -> {
            return rxTransaction.run(str).summary();
        }, (v0) -> {
            return v0.commit();
        }, (v0) -> {
            return v0.rollback();
        }).single()).consumeNextWith((v0) -> {
            Assertions.assertNotNull(v0);
        }).verifyComplete();
    }

    private void verifyCommittedOrRolledBack(boolean z) {
        if (z) {
            Assertions.assertEquals(2, countNodes(1));
            Assertions.assertEquals(1, countNodes(2));
        } else {
            Assertions.assertEquals(0, countNodes(1));
            Assertions.assertEquals(0, countNodes(2));
        }
    }

    private void assertCanCommitOrRollback(boolean z, RxTransaction rxTransaction) {
        if (z) {
            assertCanCommit(rxTransaction);
        } else {
            assertCanRollback(rxTransaction);
        }
    }

    private void assertCanCommit(RxTransaction rxTransaction) {
        MatcherAssert.assertThat(TestUtil.await(rxTransaction.commit()), Matchers.equalTo(Collections.emptyList()));
    }

    private void assertCanRollback(RxTransaction rxTransaction) {
        MatcherAssert.assertThat(TestUtil.await(rxTransaction.rollback()), Matchers.equalTo(Collections.emptyList()));
    }

    private static Stream<Boolean> commit() {
        return Stream.of((Object[]) new Boolean[]{true, false});
    }

    private static void assertCanRunCreate(RxTransaction rxTransaction) {
        Node asNode = ((Record) TestUtil.await(Flux.from(rxTransaction.run("CREATE (n:Node {id: 4242}) RETURN n").records()).single())).get(0).asNode();
        Assertions.assertEquals("Node", Iterables.single(asNode.labels()));
        Assertions.assertEquals(4242, asNode.get("id").asInt());
    }

    private static void assertFailToRunWrongStatement(RxTransaction rxTransaction) {
        RxStatementResult run = rxTransaction.run("RETURN");
        MatcherAssert.assertThat((Exception) Assertions.assertThrows(Exception.class, () -> {
            TestUtil.await(run.records());
        }), Matchers.is(org.neo4j.driver.internal.util.Matchers.syntaxError("Unexpected end of input")));
    }

    private void assertCanRunReturnOne(RxTransaction rxTransaction) {
        List await = TestUtil.await(rxTransaction.run("RETURN 42").records());
        MatcherAssert.assertThat(Integer.valueOf(await.size()), Matchers.equalTo(1));
        Assertions.assertEquals(42, ((Record) await.get(0)).get(0).asInt());
    }
}
