package org.neo4j.causalclustering.messaging.marshalling.v2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.stream.ChunkedInput;
import io.netty.util.ReferenceCountUtil;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolClientInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v1.RaftProtocolServerInstallerV1;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolClientInstallerV2;
import org.neo4j.causalclustering.core.consensus.protocol.v2.RaftProtocolServerInstallerV2;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalOperationId;
import org.neo4j.causalclustering.core.state.machines.dummy.DummyRequest;
import org.neo4j.causalclustering.core.state.machines.locks.ReplicatedLockTokenRequest;
import org.neo4j.causalclustering.core.state.machines.token.ReplicatedTokenRequest;
import org.neo4j.causalclustering.core.state.machines.token.TokenType;
import org.neo4j.causalclustering.core.state.machines.tx.ReplicatedTransaction;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.logging.FormattedLogProvider;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/neo4j/causalclustering/messaging/marshalling/v2/RaftMessageEncoderDecoderTest.class */
public class RaftMessageEncoderDecoderTest {
    private static final MemberId MEMBER_ID = new MemberId(UUID.randomUUID());
    private static final int[] PROTOCOLS = {1, 2};

    @Parameterized.Parameter
    public RaftMessages.RaftMessage raftMessage;

    @Parameterized.Parameter(1)
    public int raftProtocol;
    private final RaftMessageHandler handler = new RaftMessageHandler();
    private EmbeddedChannel outbound;
    private EmbeddedChannel inbound;

    /* loaded from: input_file:org/neo4j/causalclustering/messaging/marshalling/v2/RaftMessageEncoderDecoderTest$RaftMessageHandler.class */
    class RaftMessageHandler extends SimpleChannelInboundHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.RaftMessage>> {
        private RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.RaftMessage> msg;

        RaftMessageHandler() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.RaftMessage> receivedInstantClusterIdAwareMessage) {
            this.msg = receivedInstantClusterIdAwareMessage;
        }

        RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.RaftMessage> getRaftMessage() {
            return this.msg;
        }
    }

    @Parameterized.Parameters(name = "Raft v{1} with message {0}")
    public static Object[] data() {
        return setUpParams(new RaftMessages.RaftMessage[]{new RaftMessages.Heartbeat(MEMBER_ID, 1L, 2L, 3L), new RaftMessages.HeartbeatResponse(MEMBER_ID), new RaftMessages.NewEntry.Request(MEMBER_ID, new DummyRequest(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})), new RaftMessages.NewEntry.Request(MEMBER_ID, ReplicatedTransaction.from(new byte[]{1, 2, 3, 4, 5, 6, 7, 8})), new RaftMessages.NewEntry.Request(MEMBER_ID, ReplicatedTransaction.from(new PhysicalTransactionRepresentation(Collections.emptyList()))), new RaftMessages.NewEntry.Request(MEMBER_ID, new DistributedOperation(new DistributedOperation(ReplicatedTransaction.from(new byte[]{1, 2, 3, 4, 5}), new GlobalSession(UUID.randomUUID(), MEMBER_ID), new LocalOperationId(1L, 2L)), new GlobalSession(UUID.randomUUID(), MEMBER_ID), new LocalOperationId(3L, 4L))), new RaftMessages.AppendEntries.Request(MEMBER_ID, 1L, 2L, 3L, new RaftLogEntry[]{new RaftLogEntry(0L, new ReplicatedTokenRequest(TokenType.LABEL, "name", new byte[]{2, 3, 4})), new RaftLogEntry(1L, new ReplicatedLockTokenRequest(MEMBER_ID, 2))}, 5L), new RaftMessages.AppendEntries.Response(MEMBER_ID, 1L, true, 2L, 3L), new RaftMessages.Vote.Request(MEMBER_ID, Long.MAX_VALUE, MEMBER_ID, Long.MIN_VALUE, 1L), new RaftMessages.Vote.Response(MEMBER_ID, 1L, true), new RaftMessages.PreVote.Request(MEMBER_ID, Long.MAX_VALUE, MEMBER_ID, Long.MIN_VALUE, 1L), new RaftMessages.PreVote.Response(MEMBER_ID, 1L, true), new RaftMessages.LogCompactionInfo(MEMBER_ID, Long.MAX_VALUE, Long.MIN_VALUE)});
    }

    private static Object[] setUpParams(RaftMessages.RaftMessage[] raftMessageArr) {
        return Arrays.stream(raftMessageArr).flatMap(RaftMessageEncoderDecoderTest::params).toArray();
    }

    private static Stream<Object[]> params(RaftMessages.RaftMessage raftMessage) {
        return Arrays.stream(PROTOCOLS).mapToObj(i -> {
            return new Object[]{raftMessage, Integer.valueOf(i)};
        });
    }

    @Before
    public void setupChannels() throws Exception {
        this.outbound = new EmbeddedChannel();
        this.inbound = new EmbeddedChannel();
        if (this.raftProtocol == 2) {
            new RaftProtocolClientInstallerV2(new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER), Collections.emptyList(), FormattedLogProvider.toOutputStream(System.out)).install(this.outbound);
            new RaftProtocolServerInstallerV2(this.handler, new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER), Collections.emptyList(), FormattedLogProvider.toOutputStream(System.out)).install(this.inbound);
        } else {
            if (this.raftProtocol != 1) {
                throw new IllegalArgumentException("Unknown raft protocol " + this.raftProtocol);
            }
            new RaftProtocolClientInstallerV1(new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER), Collections.emptyList(), FormattedLogProvider.toOutputStream(System.out)).install(this.outbound);
            new RaftProtocolServerInstallerV1(this.handler, new NettyPipelineBuilderFactory(VoidPipelineWrapperFactory.VOID_WRAPPER), Collections.emptyList(), FormattedLogProvider.toOutputStream(System.out)).install(this.inbound);
        }
    }

    @After
    public void cleanUp() {
        if (this.outbound != null) {
            this.outbound.close();
        }
        if (this.inbound != null) {
            this.inbound.close();
        }
        this.inbound = null;
        this.outbound = null;
    }

    @Test
    public void shouldEncodeDecodeRaftMessage() throws Exception {
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        this.outbound.writeOutbound(new Object[]{RaftMessages.ReceivedInstantClusterIdAwareMessage.of(Instant.now(), clusterId, this.raftMessage)});
        while (true) {
            Object readOutbound = this.outbound.readOutbound();
            if (readOutbound == null) {
                RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.RaftMessage> raftMessage = this.handler.getRaftMessage();
                Assert.assertEquals(clusterId, raftMessage.clusterId());
                raftMessageEquals(this.raftMessage, raftMessage.message());
                Assert.assertNull(this.inbound.readInbound());
                ReferenceCountUtil.release(this.handler.msg);
                return;
            }
            this.inbound.writeInbound(new Object[]{readOutbound});
        }
    }

    private void raftMessageEquals(RaftMessages.RaftMessage raftMessage, RaftMessages.RaftMessage raftMessage2) throws Exception {
        if (raftMessage instanceof RaftMessages.NewEntry.Request) {
            Assert.assertEquals(raftMessage2.from(), raftMessage.from());
            Assert.assertEquals(raftMessage2.type(), raftMessage.type());
            contentEquals(((RaftMessages.NewEntry.Request) raftMessage).content(), ((RaftMessages.NewEntry.Request) raftMessage).content());
        } else if (raftMessage instanceof RaftMessages.AppendEntries.Request) {
            Assert.assertEquals(raftMessage2.from(), raftMessage.from());
            Assert.assertEquals(raftMessage2.type(), raftMessage.type());
            RaftLogEntry[] entries = ((RaftMessages.AppendEntries.Request) raftMessage).entries();
            RaftLogEntry[] entries2 = ((RaftMessages.AppendEntries.Request) raftMessage2).entries();
            for (int i = 0; i < entries.length; i++) {
                RaftLogEntry raftLogEntry = entries[i];
                RaftLogEntry raftLogEntry2 = entries2[i];
                Assert.assertEquals(raftLogEntry.term(), raftLogEntry2.term());
                contentEquals(raftLogEntry.content(), raftLogEntry2.content());
            }
        }
    }

    private void contentEquals(ReplicatedContent replicatedContent, ReplicatedContent replicatedContent2) throws Exception {
        if (replicatedContent instanceof ReplicatedTransaction) {
            ByteBuf buffer = Unpooled.buffer();
            ByteBuf buffer2 = Unpooled.buffer();
            encode(buffer, ((ReplicatedTransaction) replicatedContent).encode());
            encode(buffer2, ((ReplicatedTransaction) replicatedContent2).encode());
            Assert.assertEquals(buffer, buffer2);
            return;
        }
        if (!(replicatedContent instanceof DistributedOperation)) {
            Assert.assertEquals(replicatedContent, replicatedContent2);
            return;
        }
        Assert.assertEquals(((DistributedOperation) replicatedContent).globalSession(), ((DistributedOperation) replicatedContent2).globalSession());
        Assert.assertEquals(((DistributedOperation) replicatedContent).operationId(), ((DistributedOperation) replicatedContent2).operationId());
        contentEquals(((DistributedOperation) replicatedContent).content(), ((DistributedOperation) replicatedContent2).content());
    }

    private static void encode(ByteBuf byteBuf, ChunkedInput<ByteBuf> chunkedInput) throws Exception {
        while (!chunkedInput.isEndOfInput()) {
            ByteBuf byteBuf2 = (ByteBuf) chunkedInput.readChunk(UnpooledByteBufAllocator.DEFAULT);
            if (byteBuf2 != null) {
                byteBuf.writeBytes(byteBuf2);
                byteBuf2.release();
            }
        }
    }
}
