package org.neo4j.bolt.v1.transport.integration;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.neo4j.bolt.runtime.BoltConnectionReadLimiter;
import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.messaging.message.DiscardAllMessage;
import org.neo4j.bolt.v1.messaging.message.InitMessage;
import org.neo4j.bolt.v1.messaging.message.RunMessage;
import org.neo4j.bolt.v1.messaging.util.MessageMatchers;
import org.neo4j.bolt.v1.transport.socket.client.SocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.collection.RawIterator;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.internal.kernel.api.exceptions.ProcedureException;
import org.neo4j.internal.kernel.api.procs.Neo4jTypes;
import org.neo4j.internal.kernel.api.procs.ProcedureSignature;
import org.neo4j.kernel.api.ResourceTracker;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.util.ValueUtils;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

/* loaded from: input_file:org/neo4j/bolt/v1/transport/integration/BoltChannelAutoReadLimiterIT.class */
public class BoltChannelAutoReadLimiterIT {
    private AssertableLogProvider logProvider;
    private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
    private Neo4jWithSocket server = new Neo4jWithSocket(getClass(), getTestGraphDatabaseFactory(), this.fsRule, getSettingsFunction());

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.fsRule).around(this.server);
    private HostnamePort address;
    private TransportConnection connection;
    private TransportTestUtil util;

    protected TestGraphDatabaseFactory getTestGraphDatabaseFactory() {
        TestGraphDatabaseFactory testGraphDatabaseFactory = new TestGraphDatabaseFactory();
        this.logProvider = new AssertableLogProvider();
        testGraphDatabaseFactory.setInternalLogProvider(this.logProvider);
        return testGraphDatabaseFactory;
    }

    protected Consumer<Map<String, String>> getSettingsFunction() {
        return map -> {
        };
    }

    @Before
    public void setup() throws Exception {
        installSleepProcedure(this.server.graphDatabaseService());
        this.address = this.server.lookupDefaultConnector();
        this.connection = new SocketConnection();
        this.util = new TransportTestUtil(new Neo4jPackV1());
    }

    @Test
    public void largeNumberOfSlowRunningJobsShouldChangeAutoReadState() throws Exception {
        String repeat = StringUtils.repeat(" ", 8192);
        this.connection.connect(this.address).send(this.util.defaultAcceptedVersions()).send(this.util.chunk(InitMessage.init("TestClient/1.1", Collections.emptyMap())));
        MatcherAssert.assertThat(this.connection, this.util.eventuallyReceivesSelectedProtocolVersion());
        MatcherAssert.assertThat(this.connection, this.util.eventuallyReceives(MessageMatchers.msgSuccess()));
        for (int i = 0; i < 1000; i++) {
            this.connection.send(this.util.chunk(RunMessage.run("CALL boltissue.sleep( $data )", ValueUtils.asMapValue(Collections.singletonMap("data", repeat))), DiscardAllMessage.discardAll()));
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            MatcherAssert.assertThat(this.connection, this.util.eventuallyReceives(MessageMatchers.msgSuccess(), MessageMatchers.msgSuccess()));
        }
        this.logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BoltConnectionReadLimiter.class).warn(CoreMatchers.containsString("disabled"), new Object[]{CoreMatchers.anything(), CoreMatchers.anything()})});
        this.logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BoltConnectionReadLimiter.class).warn(CoreMatchers.containsString("enabled"), new Object[]{CoreMatchers.anything(), CoreMatchers.anything()})});
    }

    private static void installSleepProcedure(GraphDatabaseService graphDatabaseService) throws ProcedureException {
        ((Procedures) ((GraphDatabaseAPI) graphDatabaseService).getDependencyResolver().resolveDependency(Procedures.class)).register(new CallableProcedure.BasicProcedure(ProcedureSignature.procedureSignature(new String[]{"boltissue", "sleep"}).in("data", Neo4jTypes.NTString).out(ProcedureSignature.VOID).build()) { // from class: org.neo4j.bolt.v1.transport.integration.BoltChannelAutoReadLimiterIT.1
            public RawIterator<Object[], ProcedureException> apply(Context context, Object[] objArr, ResourceTracker resourceTracker) throws ProcedureException {
                try {
                    Thread.sleep(50L);
                    return RawIterator.empty();
                } catch (InterruptedException e) {
                    throw new ProcedureException(Status.General.UnknownError, e, "Interrupted", new Object[0]);
                }
            }
        });
    }
}
