package org.neo4j.causalclustering.core;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.neo4j.causalclustering.core.BatchingMessageHandler;
import org.neo4j.causalclustering.core.BoundedPriorityQueue;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandlerTest.class */
public class BatchingMessageHandlerTest {
    private static final BoundedPriorityQueue.Config IN_QUEUE_CONFIG = new BoundedPriorityQueue.Config(64, 1024);
    private static final BatchingMessageHandler.Config BATCH_CONFIG = new BatchingMessageHandler.Config(16, 256);
    private ExecutorService executor;
    private final Instant now = Instant.now();
    private LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> downstreamHandler = (LifecycleMessageHandler) Mockito.mock(LifecycleMessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());
    private ContinuousJob mockJob = (ContinuousJob) Mockito.mock(ContinuousJob.class);
    private Function<Runnable, ContinuousJob> jobSchedulerFactory = runnable -> {
        return this.mockJob;
    };
    private MemberId leader = new MemberId(UUID.randomUUID());

    @Before
    public void before() {
        this.executor = Executors.newCachedThreadPool();
    }

    @After
    public void after() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination(60L, TimeUnit.SECONDS);
    }

    @Test
    public void shouldInvokeInnerHandlerWhenRun() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        batchingMessageHandler.handle(wrap(new RaftMessages.NewEntry.Request((MemberId) null, content("dummy"))));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(new ReplicatedString("dummy")))));
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Throwable {
        Runnable batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("dummy");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        Future<?> submit = this.executor.submit(batchingMessageHandler);
        Thread.sleep(50L);
        batchingMessageHandler.handle(wrap(request));
        submit.get();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(replicatedString))));
    }

    @Test
    public void shouldBatchRequests() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("B");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        RaftMessages.NewEntry.Request request2 = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(request2));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.NewEntry.BatchRequest(Arrays.asList(replicatedString, replicatedString2))));
    }

    @Test
    public void shouldBatchUsingReceivedInstantOfFirstReceivedMessage() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        Instant ofEpochMilli = Instant.ofEpochMilli(1L);
        Instant plusMillis = ofEpochMilli.plusMillis(1L);
        batchingMessageHandler.handle(wrap(ofEpochMilli, request));
        batchingMessageHandler.handle(wrap(plusMillis, request));
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(ofEpochMilli, new RaftMessages.NewEntry.BatchRequest(Arrays.asList(replicatedString, replicatedString))));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesFirst() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ReplicatedString replicatedString = new ReplicatedString("A");
        ReplicatedString replicatedString2 = new ReplicatedString("C");
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString);
        RaftMessages.Heartbeat heartbeat = new RaftMessages.Heartbeat((MemberId) null, 0L, 0L, 0L);
        RaftMessages.NewEntry.Request request2 = new RaftMessages.NewEntry.Request((MemberId) null, replicatedString2);
        RaftMessages.Heartbeat heartbeat2 = new RaftMessages.Heartbeat((MemberId) null, 1L, 1L, 1L);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(heartbeat));
        batchingMessageHandler.handle(wrap(request2));
        batchingMessageHandler.handle(wrap(heartbeat2));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(Arrays.asList(replicatedString, replicatedString2));
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(heartbeat));
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(heartbeat2));
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(batchRequest));
    }

    @Test
    public void shouldBatchSingleEntryAppendEntries() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftLogEntry raftLogEntry = new RaftLogEntry(0L, content("A"));
        RaftLogEntry raftLogEntry2 = new RaftLogEntry(0L, content("B"));
        RaftMessages.AppendEntries.Request request = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, new RaftLogEntry[]{raftLogEntry}, 0L);
        RaftMessages.AppendEntries.Request request2 = new RaftMessages.AppendEntries.Request(this.leader, 1L, (-1) + 1, 0L, new RaftLogEntry[]{raftLogEntry2}, 0L);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(request2));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, new RaftLogEntry[]{raftLogEntry, raftLogEntry2}, 0L)));
    }

    /* JADX WARN: Type inference failed for: r7v5, types: [org.neo4j.causalclustering.core.consensus.log.RaftLogEntry[], java.lang.Object[][]] */
    @Test
    public void shouldBatchMultipleEntryAppendEntries() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftLogEntry[] entries = entries(0L, 0, 2);
        RaftLogEntry[] entries2 = entries(1L, 3, 3);
        RaftLogEntry[] entries3 = entries(2L, 4, 8);
        RaftLogEntry[] entries4 = entries(3L, 9, 15);
        RaftMessages.AppendEntries.Request request = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, entries, 0L);
        long length = (-1) + request.entries().length;
        long term = ((RaftLogEntry) ArrayUtil.lastOf(request.entries())).term();
        long j = 0 + 2;
        RaftMessages.AppendEntries.Request request2 = new RaftMessages.AppendEntries.Request(this.leader, 1L, length, term, entries2, j);
        long length2 = length + request2.entries().length;
        long term2 = ((RaftLogEntry) ArrayUtil.lastOf(request2.entries())).term();
        long j2 = j + 5;
        RaftMessages.AppendEntries.Request request3 = new RaftMessages.AppendEntries.Request(this.leader, 1L, length2, term2, (RaftLogEntry[]) ArrayUtil.concat(entries3, entries4), j2);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(request2));
        batchingMessageHandler.handle(wrap(request3));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, (RaftLogEntry[]) ArrayUtil.concatArrays(entries, (Object[][]) new RaftLogEntry[]{entries2, entries3, entries4}), j2)));
    }

    @Test
    public void shouldNotBatchAppendEntriesDifferentLeaderTerms() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftLogEntry[] entries = entries(0L, 0, 2);
        RaftLogEntry[] entries2 = entries(1L, 3, 3);
        RaftMessages.AppendEntries.Request request = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, entries, 0L);
        RaftMessages.AppendEntries.Request request2 = new RaftMessages.AppendEntries.Request(this.leader, 1 + 1, (-1) + request.entries().length, ((RaftLogEntry) ArrayUtil.lastOf(request.entries())).term(), entries2, 0L);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(request2));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(request));
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).handle(wrap(request2));
    }

    @Test
    public void shouldPrioritiseCorrectly() {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, content(""));
        RaftMessages.AppendEntries.Request request2 = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, entries(0L, 0, 0), 0L);
        RaftMessages.AppendEntries.Request request3 = new RaftMessages.AppendEntries.Request(this.leader, 1L, -1L, -1L, RaftLogEntry.empty, 0L);
        RaftMessages.Heartbeat heartbeat = new RaftMessages.Heartbeat((MemberId) null, 0L, 0L, 0L);
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.handle(wrap(request2));
        batchingMessageHandler.handle(wrap(heartbeat));
        batchingMessageHandler.handle(wrap(request3));
        Mockito.verifyZeroInteractions(new Object[]{this.downstreamHandler});
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        batchingMessageHandler.run();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.downstreamHandler});
        ((LifecycleMessageHandler) inOrder.verify(this.downstreamHandler)).handle(wrap(heartbeat));
        ((LifecycleMessageHandler) inOrder.verify(this.downstreamHandler)).handle(wrap(request3));
        ((LifecycleMessageHandler) inOrder.verify(this.downstreamHandler)).handle(wrap(request2));
        ((LifecycleMessageHandler) inOrder.verify(this.downstreamHandler)).handle(wrap(new RaftMessages.NewEntry.BatchRequest(Collections.singletonList(content("")))));
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Throwable {
        AssertableLogProvider assertableLogProvider = new AssertableLogProvider();
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, assertableLogProvider);
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, (ReplicatedContent) null);
        batchingMessageHandler.stop();
        batchingMessageHandler.handle(wrap(request));
        batchingMessageHandler.run();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler, Mockito.never())).handle((Message) ArgumentMatchers.any(RaftMessages.ReceivedInstantClusterIdAwareMessage.class));
        assertableLogProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{wrap(request)})});
    }

    @Test(timeout = 5000)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Throwable {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, new BoundedPriorityQueue.Config(1, 1, 1024L), BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        RaftMessages.NewEntry.Request request = new RaftMessages.NewEntry.Request((MemberId) null, new ReplicatedString("dummy"));
        batchingMessageHandler.handle(wrap(request));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            countDownLatch.countDown();
            batchingMessageHandler.handle(wrap(request));
        });
        thread.start();
        countDownLatch.await();
        batchingMessageHandler.stop();
        thread.join();
    }

    @Test
    public void shouldDelegateStart() throws Throwable {
        BatchingMessageHandler batchingMessageHandler = new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchingMessageHandler.start(clusterId);
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).start(clusterId);
    }

    @Test
    public void shouldDelegateStop() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance()).stop();
        ((LifecycleMessageHandler) Mockito.verify(this.downstreamHandler)).stop();
    }

    @Test
    public void shouldStartJob() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance()).start(new ClusterId(UUID.randomUUID()));
        ((ContinuousJob) Mockito.verify(this.mockJob)).start();
    }

    @Test
    public void shouldStopJob() throws Throwable {
        new BatchingMessageHandler(this.downstreamHandler, IN_QUEUE_CONFIG, BATCH_CONFIG, this.jobSchedulerFactory, NullLogProvider.getInstance()).stop();
        ((ContinuousJob) Mockito.verify(this.mockJob)).stop();
    }

    private RaftMessages.ReceivedInstantClusterIdAwareMessage wrap(RaftMessages.RaftMessage raftMessage) {
        return wrap(this.now, raftMessage);
    }

    private RaftMessages.ReceivedInstantClusterIdAwareMessage<?> wrap(Instant instant, RaftMessages.RaftMessage raftMessage) {
        return RaftMessages.ReceivedInstantClusterIdAwareMessage.of(instant, this.localClusterId, raftMessage);
    }

    private ReplicatedContent content(String str) {
        return new ReplicatedString(str);
    }

    private RaftLogEntry[] entries(long j, int i, int i2) {
        RaftLogEntry[] raftLogEntryArr = new RaftLogEntry[(i2 - i) + 1];
        for (int i3 = i; i3 <= i2; i3++) {
            raftLogEntryArr[i3 - i] = new RaftLogEntry(j, new ReplicatedString(String.valueOf(i3)));
        }
        return raftLogEntryArr;
    }
}
