package org.neo4j.causalclustering.core.server;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.function.Predicates;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/server/BatchingMessageHandler.class */
class BatchingMessageHandler extends LifecycleAdapter implements Runnable, Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> {
    private final Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> handler;
    private final Log log;
    private final int maxBatch;
    private final List<RaftMessages.ClusterIdAwareMessage> batch;
    private final BlockingQueue<RaftMessages.ClusterIdAwareMessage> messageQueue;
    private volatile boolean stopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchingMessageHandler(Inbound.MessageHandler<RaftMessages.ClusterIdAwareMessage> messageHandler, int i, int i2, LogProvider logProvider) {
        this.handler = messageHandler;
        this.log = logProvider.getLog(getClass());
        this.maxBatch = i2;
        this.batch = new ArrayList(i2);
        this.messageQueue = new ArrayBlockingQueue(i);
    }

    public void stop() {
        this.stopped = true;
    }

    @Override // org.neo4j.causalclustering.messaging.Inbound.MessageHandler
    public void handle(RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage) {
        if (this.stopped) {
            this.log.debug("This handler has been stopped, dropping the message: %s", new Object[]{clusterIdAwareMessage});
        } else {
            Predicates.awaitForever(() -> {
                return this.stopped || this.messageQueue.offer(clusterIdAwareMessage);
            }, 100L, TimeUnit.MILLISECONDS);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage = null;
        try {
            clusterIdAwareMessage = this.messageQueue.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
        if (clusterIdAwareMessage != null) {
            if (this.messageQueue.isEmpty()) {
                this.handler.handle(clusterIdAwareMessage);
                return;
            }
            this.batch.clear();
            this.batch.add(clusterIdAwareMessage);
            drain(this.messageQueue, this.batch, this.maxBatch - 1);
            collateAndHandleBatch(this.batch);
        }
    }

    private void drain(BlockingQueue<RaftMessages.ClusterIdAwareMessage> blockingQueue, List<RaftMessages.ClusterIdAwareMessage> list, int i) {
        ArrayList arrayList = new ArrayList();
        blockingQueue.drainTo(arrayList, i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            list.add((RaftMessages.ClusterIdAwareMessage) it.next());
        }
    }

    private void collateAndHandleBatch(List<RaftMessages.ClusterIdAwareMessage> list) {
        RaftMessages.NewEntry.BatchRequest batchRequest = null;
        ClusterId clusterId = list.get(0).clusterId();
        for (RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage : list) {
            if (batchRequest != null && !clusterIdAwareMessage.clusterId().equals(clusterId)) {
                this.handler.handle(new RaftMessages.ClusterIdAwareMessage(clusterId, batchRequest));
                batchRequest = null;
            }
            clusterId = clusterIdAwareMessage.clusterId();
            RaftMessages.RaftMessage message = clusterIdAwareMessage.message();
            if (message instanceof RaftMessages.NewEntry.Request) {
                RaftMessages.NewEntry.Request request = (RaftMessages.NewEntry.Request) message;
                if (batchRequest == null) {
                    batchRequest = new RaftMessages.NewEntry.BatchRequest(list.size());
                }
                batchRequest.add(request.content());
            } else {
                this.handler.handle(clusterIdAwareMessage);
            }
        }
        if (batchRequest != null) {
            this.handler.handle(new RaftMessages.ClusterIdAwareMessage(clusterId, batchRequest));
        }
    }
}
