package org.neo4j.causalclustering.messaging;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.RaftProtocolClientInstaller;
import org.neo4j.causalclustering.core.consensus.RaftProtocolServerInstaller;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSet;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationSupportedProtocols;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/neo4j/causalclustering/messaging/SenderServiceIT.class */
public class SenderServiceIT {
    private final LogProvider logProvider = NullLogProvider.getInstance();
    private final ApplicationSupportedProtocols supportedApplicationProtocol = new ApplicationSupportedProtocols(Protocol.ApplicationProtocolCategory.RAFT, Collections.emptyList());
    private final Collection<ModifierSupportedProtocols> supportedModifierProtocols = Collections.emptyList();
    private final ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository(Protocol.ApplicationProtocols.values(), this.supportedApplicationProtocol);
    private final ModifierProtocolRepository modifierProtocolRepository = new ModifierProtocolRepository(Protocol.ModifierProtocols.values(), this.supportedModifierProtocols);

    @Parameterized.Parameter
    public boolean blocking;

    @Parameterized.Parameters(name = "blocking={0}")
    public static Iterable<Boolean> params() {
        return Iterators.asSet(new Boolean[]{true, false});
    }

    @Test
    public void shouldSendAndReceive() throws Throwable {
        int allocatePort = PortAuthority.allocatePort();
        final Semaphore semaphore = new Semaphore(0);
        Server raftServer = raftServer(new ChannelInboundHandlerAdapter() { // from class: org.neo4j.causalclustering.messaging.SenderServiceIT.1
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                semaphore.release();
            }
        }, allocatePort);
        raftServer.start();
        SenderService raftSender = raftSender();
        raftSender.start();
        AdvertisedSocketAddress advertisedSocketAddress = new AdvertisedSocketAddress("localhost", allocatePort);
        MemberId memberId = new MemberId(UUID.randomUUID());
        raftSender.send(advertisedSocketAddress, RaftMessages.ClusterIdAwareMessage.of(new ClusterId(UUID.randomUUID()), new RaftMessages.NewEntry.Request(memberId, new MemberIdSet(Iterators.asSet(new MemberId[]{memberId})))), this.blocking);
        Assert.assertTrue(semaphore.tryAcquire(15L, TimeUnit.SECONDS));
        raftSender.stop();
        raftServer.stop();
    }

    private Server raftServer(ChannelInboundHandler channelInboundHandler, int i) {
        NettyPipelineBuilderFactory nettyPipelineBuilderFactory = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        return new Server(new HandshakeServerInitializer(this.applicationProtocolRepository, this.modifierProtocolRepository, new ProtocolInstallerRepository(Collections.singletonList(new RaftProtocolServerInstaller.Factory(channelInboundHandler, nettyPipelineBuilderFactory, this.logProvider)), ModifierProtocolInstaller.allServerInstallers), nettyPipelineBuilderFactory, this.logProvider), (ChannelInboundHandler) null, this.logProvider, this.logProvider, new ListenSocketAddress("localhost", i), "raft-server");
    }

    private SenderService raftSender() {
        NettyPipelineBuilderFactory nettyPipelineBuilderFactory = new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER);
        return new SenderService(new HandshakeClientInitializer(this.applicationProtocolRepository, this.modifierProtocolRepository, new ProtocolInstallerRepository(Collections.singletonList(new RaftProtocolClientInstaller.Factory(nettyPipelineBuilderFactory, this.logProvider)), ModifierProtocolInstaller.allClientInstallers), nettyPipelineBuilderFactory, Duration.ofSeconds(5L), this.logProvider), this.logProvider);
    }
}
