package org.neo4j.causalclustering.core.server;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/server/BatchingMessageHandlerTest.class */
public class BatchingMessageHandlerTest {
    private static final int MAX_BATCH = 16;
    private static final int QUEUE_SIZE = 64;
    private Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> raftStateMachine = (Inbound.MessageHandler) Mockito.mock(Inbound.MessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());

    @Test
    public void shouldInvokeInnerHandlerWhenRun() throws Exception {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance());
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.handle(clusterIdAwareMessage);
        Mockito.verifyZeroInteractions(new Object[]{this.raftStateMachine});
        batchingMessageHandler.run();
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(clusterIdAwareMessage);
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Exception {
        Runnable batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance());
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        Future<?> submit = Executors.newCachedThreadPool().submit(batchingMessageHandler);
        Thread.sleep(50L);
        batchingMessageHandler.handle(clusterIdAwareMessage);
        submit.get();
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(clusterIdAwareMessage);
    }

    @Test
    public void shouldBatchRequests() throws Exception {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("B");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        RaftMessages.NewEntry.Request request2 = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2);
        batchingMessageHandler.handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, request));
        batchingMessageHandler.handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, request2));
        Mockito.verifyZeroInteractions(new Object[]{this.raftStateMachine});
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add(replicatedString);
        batchRequest.add(replicatedString2);
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, batchRequest));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Exception {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("C");
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, replicatedString));
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage2 = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.Heartbeat((MemberId) null, 0L, 0L, 0L));
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage3 = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2));
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage4 = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.Heartbeat((MemberId) null, 1L, 1L, 1L));
        batchingMessageHandler.handle(clusterIdAwareMessage);
        batchingMessageHandler.handle(clusterIdAwareMessage2);
        batchingMessageHandler.handle(clusterIdAwareMessage3);
        batchingMessageHandler.handle(clusterIdAwareMessage4);
        Mockito.verifyZeroInteractions(new Object[]{this.raftStateMachine});
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add(replicatedString);
        batchRequest.add(replicatedString2);
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(new RaftMessages.ClusterIdAwareMessage(this.localClusterId, batchRequest));
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(clusterIdAwareMessage2);
        ((Inbound.MessageHandler) Mockito.verify(this.raftStateMachine)).handle(clusterIdAwareMessage4);
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Exception {
        AssertableLogProvider assertableLogProvider = new AssertableLogProvider();
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, QUEUE_SIZE, MAX_BATCH, assertableLogProvider);
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.stop();
        batchingMessageHandler.handle(clusterIdAwareMessage);
        batchingMessageHandler.run();
        Mockito.verifyZeroInteractions(new Object[]{this.raftStateMachine});
        assertableLogProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{clusterIdAwareMessage})});
    }

    @Test(timeout = 5000)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Exception {
        final BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.raftStateMachine, 1, MAX_BATCH, NullLogProvider.getInstance());
        final RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = new RaftMessages.ClusterIdAwareMessage(this.localClusterId, new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null));
        batchingMessageHandler.handle(clusterIdAwareMessage);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread() { // from class: org.neo4j.causalclustering.core.server.BatchingMessageHandlerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                batchingMessageHandler.handle(clusterIdAwareMessage);
            }
        };
        thread.start();
        countDownLatch.await();
        batchingMessageHandler.stop();
        thread.join();
    }
}
