package org.neo4j.causalclustering.catchup;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.channels.ClosedChannelException;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

/* loaded from: input_file:org/neo4j/causalclustering/catchup/CatchUpClientIT.class */
class CatchUpClientIT {
    private LifeSupport lifeSupport;
    private int inactivityTimeoutMillis = 10000;

    CatchUpClientIT() {
    }

    @BeforeEach
    void initLifeCycles() {
        this.lifeSupport = new LifeSupport();
    }

    @AfterEach
    void shutdownLifeSupport() {
        this.lifeSupport.stop();
        this.lifeSupport.shutdown();
    }

    @Test
    void shouldCloseHandlerIfChannelIsClosedInClient() throws LifecycleException {
        int allocatePort = PortAuthority.allocatePort();
        ListenSocketAddress listenSocketAddress = new ListenSocketAddress("localhost", allocatePort);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Server catchupServer = catchupServer(listenSocketAddress, new ChannelHandler[0]);
        CatchUpClient closingChannelCatchupClient = closingChannelCatchupClient(atomicBoolean);
        this.lifeSupport.add(catchupServer);
        this.lifeSupport.add(closingChannelCatchupClient);
        this.lifeSupport.init();
        this.lifeSupport.start();
        assertClosedChannelException("localhost", allocatePort, closingChannelCatchupClient);
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldCloseHandlerIfChannelIsClosedOnServer() {
        String str = "localhost";
        int allocatePort = PortAuthority.allocatePort();
        ListenSocketAddress listenSocketAddress = new ListenSocketAddress("localhost", allocatePort);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Server closingChannelCatchupServer = closingChannelCatchupServer(listenSocketAddress, atomicBoolean);
        CatchUpClient emptyClient = emptyClient();
        this.lifeSupport.add(closingChannelCatchupServer);
        this.lifeSupport.add(emptyClient);
        this.lifeSupport.init();
        this.lifeSupport.start();
        Assert.assertEquals(ClosedChannelException.class, Exceptions.rootCause(Assertions.assertThrows(CatchUpClientException.class, () -> {
            emptyClient.makeBlockingRequest(new AdvertisedSocketAddress(str, allocatePort), new GetStoreIdRequest(), neverCompletingAdaptor());
        })).getClass());
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldTimeoutDueToInactivity() {
        String str = "localhost";
        int allocatePort = PortAuthority.allocatePort();
        ListenSocketAddress listenSocketAddress = new ListenSocketAddress("localhost", allocatePort);
        this.inactivityTimeoutMillis = 0;
        Server catchupServer = catchupServer(listenSocketAddress, new ChannelHandler[0]);
        CatchUpClient emptyClient = emptyClient();
        this.lifeSupport.add(catchupServer);
        this.lifeSupport.add(emptyClient);
        this.lifeSupport.init();
        this.lifeSupport.start();
        Assert.assertEquals(TimeoutException.class, Exceptions.rootCause(Assertions.assertThrows(CatchUpClientException.class, () -> {
            emptyClient.makeBlockingRequest(new AdvertisedSocketAddress(str, allocatePort), new GetStoreIdRequest(), neverCompletingAdaptor());
        })).getClass());
    }

    private CatchUpClient emptyClient() {
        return catchupClient(new MessageToByteEncoder<GetStoreIdRequest>() { // from class: org.neo4j.causalclustering.catchup.CatchUpClientIT.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void encode(ChannelHandlerContext channelHandlerContext, GetStoreIdRequest getStoreIdRequest, ByteBuf byteBuf) {
                byteBuf.writeByte(1);
            }
        });
    }

    private void assertClosedChannelException(String str, int i, CatchUpClient catchUpClient) {
        try {
            catchUpClient.makeBlockingRequest(new AdvertisedSocketAddress(str, i), new GetStoreIdRequest(), neverCompletingAdaptor());
            Assert.fail();
        } catch (CatchUpClientException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(cause.getClass(), ExecutionException.class);
            Assert.assertEquals(cause.getCause().getClass(), ClosedChannelException.class);
        }
    }

    private CatchUpResponseAdaptor<Object> neverCompletingAdaptor() {
        return new CatchUpResponseAdaptor<>();
    }

    private CatchUpClient closingChannelCatchupClient(final AtomicBoolean atomicBoolean) {
        return catchupClient(new MessageToByteEncoder() { // from class: org.neo4j.causalclustering.catchup.CatchUpClientIT.2
            protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) {
                atomicBoolean.set(true);
                channelHandlerContext.channel().close();
            }
        });
    }

    private Server closingChannelCatchupServer(ListenSocketAddress listenSocketAddress, final AtomicBoolean atomicBoolean) {
        return catchupServer(listenSocketAddress, new ByteToMessageDecoder() { // from class: org.neo4j.causalclustering.catchup.CatchUpClientIT.3
            protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
                atomicBoolean.set(true);
                channelHandlerContext.channel().close();
            }
        });
    }

    private CatchUpClient catchupClient(ChannelHandler... channelHandlerArr) {
        return new CatchUpClient(NullLogProvider.getInstance(), Clock.systemUTC(), this.inactivityTimeoutMillis, catchUpResponseHandler -> {
            return new ChannelInitializer<SocketChannel>() { // from class: org.neo4j.causalclustering.catchup.CatchUpClientIT.4
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) {
                    socketChannel.pipeline().addLast(channelHandlerArr);
                }
            };
        });
    }

    private Server catchupServer(ListenSocketAddress listenSocketAddress, ChannelHandler... channelHandlerArr) {
        return new Server(socketChannel -> {
            socketChannel.pipeline().addLast(channelHandlerArr);
        }, listenSocketAddress, "empty-test-server");
    }
}
