package org.neo4j.driver.integration.reactive;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Values;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.shaded.reactor.core.publisher.Flux;
import org.neo4j.driver.internal.shaded.reactor.core.publisher.Mono;
import org.neo4j.driver.internal.shaded.reactor.test.StepVerifier;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxStatementResult;
import org.neo4j.driver.summary.StatementType;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;

@EnabledOnNeo4jWith(Neo4jFeature.BOLT_V4)
@ParallelizableIT
/* loaded from: input_file:org/neo4j/driver/integration/reactive/RxStatementResultIT.class */
class RxStatementResultIT {

    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    RxStatementResultIT() {
    }

    @Test
    void shouldAllowIteratingOverResultStream() {
        verifyCanAccessFullRecords(sessionRunUnwind());
    }

    @Test
    void shouldAllowIteratingOverLargeResultStream() {
        StepVerifier.FirstStep create = StepVerifier.create(Flux.from(neo4j.driver().rxSession().run("UNWIND range(1, $size) AS x RETURN x", Values.parameters(new Object[]{"size", 100000})).records()).limitRate(100).map(record -> {
            return Integer.valueOf(record.get("x").asInt());
        }));
        for (int i = 1; i <= 100000; i++) {
            create.expectNext(Integer.valueOf(i));
        }
        create.expectComplete().verify();
    }

    @Test
    void shouldReturnKeysRecordsAndSummaryInOrder() {
        RxStatementResult sessionRunUnwind = sessionRunUnwind();
        verifyCanAccessKeys(sessionRunUnwind);
        verifyCanAccessFullRecords(sessionRunUnwind);
        verifyCanAccessSummary(sessionRunUnwind);
    }

    @Test
    void shouldSecondVisitOfRecordReceiveEmptyRecordStream() throws Throwable {
        RxStatementResult sessionRunUnwind = sessionRunUnwind();
        verifyCanAccessFullRecords(sessionRunUnwind);
        verifyRecordsAlreadyDiscarded(sessionRunUnwind);
    }

    @Test
    void shouldReturnKeysSummaryAndDiscardRecords() {
        RxStatementResult sessionRunUnwind = sessionRunUnwind();
        verifyCanAccessKeys(sessionRunUnwind);
        verifyCanAccessSummary(sessionRunUnwind);
        verifyRecordsAlreadyDiscarded(sessionRunUnwind);
    }

    @Test
    void shouldAllowOnlySummary() {
        verifyCanAccessSummary(sessionRunUnwind());
    }

    @Test
    void shouldAllowAccessKeysAndSummaryAfterRecord() throws Throwable {
        RxStatementResult sessionRunUnwind = sessionRunUnwind();
        verifyCanAccessFullRecords(sessionRunUnwind);
        verifyCanAccessKeys(sessionRunUnwind);
        verifyCanAccessSummary(sessionRunUnwind);
        verifyCanAccessKeys(sessionRunUnwind);
        verifyCanAccessSummary(sessionRunUnwind);
    }

    @Test
    void shouldGiveHelpfulFailureMessageWhenAccessNonExistingField() {
        StepVerifier.create(Flux.from(neo4j.driver().rxSession().run("CREATE (n:Person {name:{name}}) RETURN n", Values.parameters(new Object[]{"name", "Tom Hanks"})).records()).single()).assertNext(record -> {
            Assertions.assertTrue(record.get("m").isNull());
        }).expectComplete().verify();
    }

    @Test
    void shouldGiveHelpfulFailureMessageWhenAccessNonExistingPropertyOnNode() {
        StepVerifier.create(Flux.from(neo4j.driver().rxSession().run("CREATE (n:Person {name:{name}}) RETURN n", Values.parameters(new Object[]{"name", "Tom Hanks"})).records()).single()).assertNext(record -> {
            Assertions.assertTrue(record.get("n").get("age").isNull());
        }).expectComplete().verify();
    }

    @Test
    void shouldHaveFieldNamesInResult() {
        RxStatementResult run = neo4j.driver().rxSession().run("CREATE (n:TestNode {name:'test'}) RETURN n");
        StepVerifier.create(run.keys()).expectNext("n").expectComplete().verify();
        StepVerifier.create(run.records()).assertNext(record -> {
            Assertions.assertEquals("[n]", record.keys().toString());
        }).expectComplete().verify();
    }

    @Test
    void shouldReturnEmptyKeyAndRecordOnEmptyResult() {
        RxStatementResult run = neo4j.driver().rxSession().run("CREATE (n:Person {name:{name}})", Values.parameters(new Object[]{"name", "Tom Hanks"}));
        StepVerifier.create(run.keys()).expectComplete().verify();
        StepVerifier.create(run.records()).expectComplete().verify();
    }

    @Test
    void shouldOnlyErrorRecordAfterFailure() {
        RxStatementResult run = neo4j.driver().rxSession().run("INVALID");
        Flux from = Flux.from(run.keys());
        Flux from2 = Flux.from(run.records());
        Mono from3 = Mono.from(run.summary());
        StepVerifier.create(from).verifyComplete();
        StepVerifier.create(from2).expectErrorSatisfies(th -> {
            MatcherAssert.assertThat(th, CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat(th.getMessage(), CoreMatchers.containsString("Invalid input"));
        }).verify();
        StepVerifier.create(from3).assertNext(resultSummary -> {
            MatcherAssert.assertThat(resultSummary.statement().text(), Matchers.equalTo("INVALID"));
            Assertions.assertNotNull(resultSummary.server().address());
            Assertions.assertNotNull(resultSummary.server().version());
        }).verifyComplete();
    }

    @Test
    void shouldErrorOnSummaryIfNoRecord() throws Throwable {
        RxStatementResult run = neo4j.driver().rxSession().run("INVALID");
        Flux from = Flux.from(run.keys());
        Mono from2 = Mono.from(run.summary());
        StepVerifier.create(from).verifyComplete();
        StepVerifier.create(from2).expectErrorSatisfies(th -> {
            MatcherAssert.assertThat(th, CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat(th.getMessage(), CoreMatchers.containsString("Invalid input"));
        }).verify();
        StepVerifier.create(from2).expectErrorSatisfies(th2 -> {
            MatcherAssert.assertThat(th2, CoreMatchers.instanceOf(ClientException.class));
            MatcherAssert.assertThat(th2.getMessage(), CoreMatchers.containsString("Invalid input"));
        }).verify();
    }

    @Test
    void shouldDiscardRecords() {
        RxStatementResult run = neo4j.driver().rxSession().run("UNWIND [1,2] AS a RETURN a");
        StepVerifier.create(Flux.from(run.records()).limitRate(1).take(1L)).assertNext(record -> {
            MatcherAssert.assertThat(Integer.valueOf(record.get("a").asInt()), Matchers.equalTo(1));
        }).thenCancel().verify();
        StepVerifier.create(run.summary()).assertNext(resultSummary -> {
            MatcherAssert.assertThat(resultSummary, Matchers.notNullValue());
            MatcherAssert.assertThat(resultSummary.statementType(), Matchers.equalTo(StatementType.READ_ONLY));
        }).expectComplete().verify();
    }

    @Test
    void shouldStreamCorrectRecordsBackBeforeError() {
        StepVerifier.create(Flux.from(neo4j.driver().rxSession().run("CYPHER runtime=interpreted UNWIND range(5, 0, -1) AS x RETURN x / x").records()).map(record -> {
            return Integer.valueOf(record.get(0).asInt());
        })).expectNext(1).expectNext(1).expectNext(1).expectNext(1).expectNext(1).expectErrorSatisfies(th -> {
            MatcherAssert.assertThat(th.getMessage(), CoreMatchers.containsString("/ by zero"));
        }).verify();
    }

    private void verifyCanAccessSummary(RxStatementResult rxStatementResult) {
        StepVerifier.create(rxStatementResult.summary()).assertNext(resultSummary -> {
            MatcherAssert.assertThat(resultSummary.statement().text(), Matchers.equalTo("UNWIND [1,2,3,4] AS a RETURN a"));
            MatcherAssert.assertThat(Integer.valueOf(resultSummary.counters().nodesCreated()), Matchers.equalTo(0));
            MatcherAssert.assertThat(resultSummary.statementType(), Matchers.equalTo(StatementType.READ_ONLY));
        }).verifyComplete();
    }

    private void verifyRecordsAlreadyDiscarded(RxStatementResult rxStatementResult) {
        StepVerifier.create(Flux.from(rxStatementResult.records()).map(record -> {
            return Integer.valueOf(record.get("a").asInt());
        })).expectComplete().verify();
    }

    private void verifyCanAccessFullRecords(RxStatementResult rxStatementResult) {
        StepVerifier.create(Flux.from(rxStatementResult.records()).map(record -> {
            return Integer.valueOf(record.get("a").asInt());
        })).expectNext(1).expectNext(2).expectNext(3).expectNext(4).expectComplete().verify();
    }

    private void verifyCanAccessKeys(RxStatementResult rxStatementResult) {
        StepVerifier.create(rxStatementResult.keys()).expectNext("a").verifyComplete();
    }

    private RxStatementResult sessionRunUnwind() {
        return neo4j.driver().rxSession().run("UNWIND [1,2,3,4] AS a RETURN a");
    }
}
