package org.neo4j.driver.internal.async.connection;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.hamcrest.Matchers;
import org.hamcrest.junit.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
import org.neo4j.driver.internal.logging.DevNullLogging;
import org.neo4j.driver.internal.messaging.request.PullAllMessage;
import org.neo4j.driver.internal.messaging.request.ResetMessage;
import org.neo4j.driver.internal.messaging.request.RunMessage;
import org.neo4j.driver.internal.metrics.InternalAbstractMetrics;
import org.neo4j.driver.internal.shaded.io.netty.channel.Channel;
import org.neo4j.driver.internal.shaded.io.netty.channel.DefaultEventLoop;
import org.neo4j.driver.internal.shaded.io.netty.channel.EventLoop;
import org.neo4j.driver.internal.shaded.io.netty.channel.embedded.EmbeddedChannel;
import org.neo4j.driver.internal.shaded.io.netty.channel.pool.ChannelPool;
import org.neo4j.driver.internal.shaded.io.netty.util.internal.ConcurrentSet;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.FakeClock;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.util.DaemonThreadFactory;

/* loaded from: input_file:org/neo4j/driver/internal/async/connection/DirectConnectionTest.class */
class DirectConnectionTest {
    private static final NoOpResponseHandler NO_OP_HANDLER = NoOpResponseHandler.INSTANCE;
    private ExecutorService executor;
    private EventLoop eventLoop;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/driver/internal/async/connection/DirectConnectionTest$ThreadTrackingInboundMessageDispatcher.class */
    public static class ThreadTrackingInboundMessageDispatcher extends InboundMessageDispatcher {
        final Set<String> queueThreadNames;

        ThreadTrackingInboundMessageDispatcher(Channel channel) {
            super(channel, DevNullLogging.DEV_NULL_LOGGING);
            this.queueThreadNames = new ConcurrentSet();
        }

        public void enqueue(ResponseHandler responseHandler) {
            this.queueThreadNames.add(Thread.currentThread().getName());
            super.enqueue(responseHandler);
        }
    }

    DirectConnectionTest() {
    }

    @AfterEach
    void tearDown() throws Exception {
        shutdownEventLoop();
    }

    @Test
    void shouldBeOpenAfterCreated() {
        Assertions.assertTrue(newConnection(newChannel()).isOpen());
    }

    @Test
    void shouldNotBeOpenAfterRelease() {
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.release();
        Assertions.assertFalse(newConnection.isOpen());
    }

    @Test
    void shouldSendResetOnRelease() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).release();
        newChannel.runPendingTasks();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.readOutbound());
    }

    @Test
    void shouldWriteInEventLoopThread() throws Exception {
        testWriteInEventLoop("WriteSingleMessage", directConnection -> {
            directConnection.write(new RunMessage("RETURN 1"), NO_OP_HANDLER);
        });
        testWriteInEventLoop("WriteMultipleMessages", directConnection2 -> {
            directConnection2.write(new RunMessage("RETURN 1"), NO_OP_HANDLER, PullAllMessage.PULL_ALL, NO_OP_HANDLER);
        });
    }

    @Test
    void shouldWriteAndFlushInEventLoopThread() throws Exception {
        testWriteInEventLoop("WriteAndFlushSingleMessage", directConnection -> {
            directConnection.writeAndFlush(new RunMessage("RETURN 1"), NO_OP_HANDLER);
        });
        testWriteInEventLoop("WriteAndFlushMultipleMessages", directConnection2 -> {
            directConnection2.writeAndFlush(new RunMessage("RETURN 1"), NO_OP_HANDLER, PullAllMessage.PULL_ALL, NO_OP_HANDLER);
        });
    }

    @Test
    void shouldWriteForceReleaseInEventLoopThread() throws Exception {
        testWriteInEventLoop("ReleaseTestEventLoop", (v0) -> {
            v0.release();
        });
    }

    @Test
    void shouldFlushInEventLoopThread() throws Exception {
        EmbeddedChannel embeddedChannel = (EmbeddedChannel) Mockito.spy(new EmbeddedChannel());
        initializeEventLoop(embeddedChannel, "Flush");
        ChannelAttributes.setProtocolVersion(embeddedChannel, 4);
        newConnection(embeddedChannel).flush();
        shutdownEventLoop();
        ((EmbeddedChannel) Mockito.verify(embeddedChannel)).flush();
    }

    @Test
    void shouldEnableAutoReadWhenReleased() {
        EmbeddedChannel newChannel = newChannel();
        newChannel.config().setAutoRead(false);
        newConnection(newChannel).release();
        newChannel.runPendingTasks();
        Assertions.assertTrue(newChannel.config().isAutoRead());
    }

    @Test
    void shouldNotDisableAutoReadWhenReleased() {
        EmbeddedChannel newChannel = newChannel();
        newChannel.config().setAutoRead(true);
        DirectConnection newConnection = newConnection(newChannel);
        newConnection.release();
        newConnection.disableAutoRead();
        Assertions.assertTrue(newChannel.config().isAutoRead());
    }

    @Test
    void shouldWriteSingleMessage() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).write(PullAllMessage.PULL_ALL, NO_OP_HANDLER);
        Assertions.assertEquals(0, newChannel.outboundMessages().size());
        newChannel.flushOutbound();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(PullAllMessage.PULL_ALL, Iterables.single(newChannel.outboundMessages()));
    }

    @Test
    void shouldWriteMultipleMessage() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).write(PullAllMessage.PULL_ALL, NO_OP_HANDLER, ResetMessage.RESET, NO_OP_HANDLER);
        Assertions.assertEquals(0, newChannel.outboundMessages().size());
        newChannel.flushOutbound();
        Assertions.assertEquals(2, newChannel.outboundMessages().size());
        Assertions.assertEquals(PullAllMessage.PULL_ALL, newChannel.outboundMessages().poll());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.outboundMessages().poll());
    }

    @Test
    void shouldWriteAndFlushSingleMessage() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).writeAndFlush(PullAllMessage.PULL_ALL, NO_OP_HANDLER);
        newChannel.runPendingTasks();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(PullAllMessage.PULL_ALL, Iterables.single(newChannel.outboundMessages()));
    }

    @Test
    void shouldWriteAndFlushMultipleMessage() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).writeAndFlush(PullAllMessage.PULL_ALL, NO_OP_HANDLER, ResetMessage.RESET, NO_OP_HANDLER);
        newChannel.runPendingTasks();
        Assertions.assertEquals(2, newChannel.outboundMessages().size());
        Assertions.assertEquals(PullAllMessage.PULL_ALL, newChannel.outboundMessages().poll());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.outboundMessages().poll());
    }

    @Test
    void shouldNotWriteSingleMessageWhenReleased() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.release();
        newConnection.write(new RunMessage("RETURN 1"), responseHandler);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionReleasedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteMultipleMessagesWhenReleased() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        ResponseHandler responseHandler2 = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.release();
        newConnection.write(new RunMessage("RETURN 1"), responseHandler, PullAllMessage.PULL_ALL, responseHandler2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionReleasedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteAndFlushSingleMessageWhenReleased() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.release();
        newConnection.writeAndFlush(new RunMessage("RETURN 1"), responseHandler);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionReleasedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteAndFlushMultipleMessagesWhenReleased() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        ResponseHandler responseHandler2 = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.release();
        newConnection.writeAndFlush(new RunMessage("RETURN 1"), responseHandler, PullAllMessage.PULL_ALL, responseHandler2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionReleasedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteSingleMessageWhenTerminated() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.terminateAndRelease("42");
        newConnection.write(new RunMessage("RETURN 1"), responseHandler);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionTerminatedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteMultipleMessagesWhenTerminated() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        ResponseHandler responseHandler2 = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.terminateAndRelease("42");
        newConnection.write(new RunMessage("RETURN 1"), responseHandler, PullAllMessage.PULL_ALL, responseHandler2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionTerminatedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteAndFlushSingleMessageWhenTerminated() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.terminateAndRelease("42");
        newConnection.writeAndFlush(new RunMessage("RETURN 1"), responseHandler);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionTerminatedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldNotWriteAndFlushMultipleMessagesWhenTerminated() {
        ResponseHandler responseHandler = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        ResponseHandler responseHandler2 = (ResponseHandler) Mockito.mock(ResponseHandler.class);
        DirectConnection newConnection = newConnection(newChannel());
        newConnection.terminateAndRelease("42");
        newConnection.writeAndFlush(new RunMessage("RETURN 1"), responseHandler, PullAllMessage.PULL_ALL, responseHandler2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalStateException.class);
        ((ResponseHandler) Mockito.verify(responseHandler)).onFailure((Throwable) forClass.capture());
        assertConnectionTerminatedError((IllegalStateException) forClass.getValue());
    }

    @Test
    void shouldReturnServerAddressWhenReleased() {
        EmbeddedChannel newChannel = newChannel();
        BoltServerAddress boltServerAddress = new BoltServerAddress("host", 4242);
        ChannelAttributes.setServerAddress(newChannel, boltServerAddress);
        DirectConnection newConnection = newConnection(newChannel);
        newConnection.release();
        Assertions.assertEquals(boltServerAddress, newConnection.serverAddress());
    }

    @Test
    void shouldReturnServerVersionWhenReleased() {
        EmbeddedChannel newChannel = newChannel();
        ServerVersion serverVersion = ServerVersion.v3_2_0;
        ChannelAttributes.setServerVersion(newChannel, serverVersion);
        DirectConnection newConnection = newConnection(newChannel);
        newConnection.release();
        Assertions.assertEquals(serverVersion, newConnection.serverVersion());
    }

    @Test
    void shouldReturnSameCompletionStageFromRelease() {
        EmbeddedChannel newChannel = newChannel();
        DirectConnection newConnection = newConnection(newChannel);
        CompletionStage release = newConnection.release();
        CompletionStage release2 = newConnection.release();
        CompletionStage release3 = newConnection.release();
        newChannel.runPendingTasks();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.outboundMessages().poll());
        Assertions.assertEquals(release, release2);
        Assertions.assertEquals(release2, release3);
    }

    @Test
    void shouldEnableAutoRead() {
        EmbeddedChannel newChannel = newChannel();
        newChannel.config().setAutoRead(false);
        newConnection(newChannel).enableAutoRead();
        Assertions.assertTrue(newChannel.config().isAutoRead());
    }

    @Test
    void shouldDisableAutoRead() {
        EmbeddedChannel newChannel = newChannel();
        newChannel.config().setAutoRead(true);
        newConnection(newChannel).disableAutoRead();
        Assertions.assertFalse(newChannel.config().isAutoRead());
    }

    @Test
    void shouldSetTerminationReasonOnChannelWhenTerminated() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).terminateAndRelease("Something really bad has happened");
        Assertions.assertEquals("Something really bad has happened", ChannelAttributes.terminationReason(newChannel));
    }

    @Test
    void shouldCloseChannelWhenTerminated() {
        EmbeddedChannel newChannel = newChannel();
        DirectConnection newConnection = newConnection(newChannel);
        Assertions.assertTrue(newChannel.isActive());
        newConnection.terminateAndRelease("test");
        Assertions.assertFalse(newChannel.isActive());
    }

    @Test
    void shouldReleaseChannelWhenTerminated() {
        EmbeddedChannel newChannel = newChannel();
        ChannelPool channelPool = (ChannelPool) Mockito.mock(ChannelPool.class);
        DirectConnection newConnection = newConnection(newChannel, channelPool);
        ((ChannelPool) Mockito.verify(channelPool, Mockito.never())).release((Channel) ArgumentMatchers.any());
        newConnection.terminateAndRelease("test");
        ((ChannelPool) Mockito.verify(channelPool)).release(newChannel);
    }

    @Test
    void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() {
        EmbeddedChannel newChannel = newChannel();
        ChannelPool channelPool = (ChannelPool) Mockito.mock(ChannelPool.class);
        DirectConnection newConnection = newConnection(newChannel, channelPool);
        ((ChannelPool) Mockito.verify(channelPool, Mockito.never())).release((Channel) ArgumentMatchers.any());
        newConnection.terminateAndRelease("reason 1");
        newConnection.terminateAndRelease("reason 2");
        newConnection.terminateAndRelease("reason 3");
        Assertions.assertEquals("reason 1", ChannelAttributes.terminationReason(newChannel));
        ((ChannelPool) Mockito.verify(channelPool)).release(newChannel);
    }

    @Test
    void shouldNotReleaseAfterTermination() {
        EmbeddedChannel newChannel = newChannel();
        ChannelPool channelPool = (ChannelPool) Mockito.mock(ChannelPool.class);
        DirectConnection newConnection = newConnection(newChannel, channelPool);
        ((ChannelPool) Mockito.verify(channelPool, Mockito.never())).release((Channel) ArgumentMatchers.any());
        newConnection.terminateAndRelease("test");
        Assertions.assertTrue(newConnection.release().toCompletableFuture().isDone());
        ((ChannelPool) Mockito.verify(channelPool)).release(newChannel);
    }

    @Test
    void shouldSendResetMessageWhenReset() {
        EmbeddedChannel newChannel = newChannel();
        newConnection(newChannel).reset();
        newChannel.runPendingTasks();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.readOutbound());
    }

    @Test
    void shouldCompleteResetFutureWhenSuccessResponseArrives() {
        EmbeddedChannel newChannel = newChannel();
        CompletableFuture completableFuture = newConnection(newChannel).reset().toCompletableFuture();
        newChannel.runPendingTasks();
        Assertions.assertFalse(completableFuture.isDone());
        ChannelAttributes.messageDispatcher(newChannel).handleSuccessMessage(Collections.emptyMap());
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertFalse(completableFuture.isCompletedExceptionally());
    }

    @Test
    void shouldCompleteResetFutureWhenFailureResponseArrives() {
        EmbeddedChannel newChannel = newChannel();
        CompletableFuture completableFuture = newConnection(newChannel).reset().toCompletableFuture();
        newChannel.runPendingTasks();
        Assertions.assertFalse(completableFuture.isDone());
        ChannelAttributes.messageDispatcher(newChannel).handleFailureMessage("Neo.TransientError.Transaction.Terminated", "Message");
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertFalse(completableFuture.isCompletedExceptionally());
    }

    @Test
    void shouldDoNothingInResetWhenClosed() {
        EmbeddedChannel newChannel = newChannel();
        DirectConnection newConnection = newConnection(newChannel);
        newConnection.release();
        newChannel.runPendingTasks();
        CompletableFuture completableFuture = newConnection.reset().toCompletableFuture();
        newChannel.runPendingTasks();
        Assertions.assertEquals(1, newChannel.outboundMessages().size());
        Assertions.assertEquals(ResetMessage.RESET, newChannel.readOutbound());
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertFalse(completableFuture.isCompletedExceptionally());
    }

    @Test
    void shouldEnableAutoReadWhenDoingReset() {
        EmbeddedChannel newChannel = newChannel();
        newChannel.config().setAutoRead(false);
        newConnection(newChannel).reset();
        newChannel.runPendingTasks();
        Assertions.assertTrue(newChannel.config().isAutoRead());
    }

    private void testWriteInEventLoop(String str, Consumer<DirectConnection> consumer) throws Exception {
        EmbeddedChannel embeddedChannel = (EmbeddedChannel) Mockito.spy(new EmbeddedChannel());
        initializeEventLoop(embeddedChannel, str);
        ThreadTrackingInboundMessageDispatcher threadTrackingInboundMessageDispatcher = new ThreadTrackingInboundMessageDispatcher(embeddedChannel);
        ChannelAttributes.setProtocolVersion(embeddedChannel, 4);
        ChannelAttributes.setMessageDispatcher(embeddedChannel, threadTrackingInboundMessageDispatcher);
        consumer.accept(newConnection(embeddedChannel));
        shutdownEventLoop();
        MatcherAssert.assertThat(Iterables.single(threadTrackingInboundMessageDispatcher.queueThreadNames), Matchers.startsWith(str));
    }

    private void initializeEventLoop(Channel channel, String str) {
        this.executor = Executors.newSingleThreadExecutor(DaemonThreadFactory.daemon(str));
        this.eventLoop = new DefaultEventLoop(this.executor);
        Mockito.when(channel.eventLoop()).thenReturn(this.eventLoop);
    }

    private void shutdownEventLoop() throws Exception {
        if (this.eventLoop != null) {
            this.eventLoop.shutdownGracefully();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            Assertions.assertTrue(this.executor.awaitTermination(30L, TimeUnit.SECONDS));
        }
    }

    private static EmbeddedChannel newChannel() {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel();
        InboundMessageDispatcher inboundMessageDispatcher = new InboundMessageDispatcher(embeddedChannel, DevNullLogging.DEV_NULL_LOGGING);
        ChannelAttributes.setProtocolVersion(embeddedChannel, 4);
        ChannelAttributes.setMessageDispatcher(embeddedChannel, inboundMessageDispatcher);
        return embeddedChannel;
    }

    private static DirectConnection newConnection(Channel channel) {
        return newConnection(channel, (ChannelPool) Mockito.mock(ChannelPool.class));
    }

    private static DirectConnection newConnection(Channel channel, ChannelPool channelPool) {
        return new DirectConnection(channel, channelPool, new FakeClock(), InternalAbstractMetrics.DEV_NULL_METRICS);
    }

    private static void assertConnectionReleasedError(IllegalStateException illegalStateException) {
        MatcherAssert.assertThat(illegalStateException.getMessage(), Matchers.startsWith("Connection has been released"));
    }

    private static void assertConnectionTerminatedError(IllegalStateException illegalStateException) {
        MatcherAssert.assertThat(illegalStateException.getMessage(), Matchers.startsWith("Connection has been terminated"));
    }
}
