package org.neo4j.coreedge.core.server;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.neo4j.coreedge.core.consensus.RaftMessages;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.messaging.Inbound;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/core/server/BatchingMessageHandler.class */
public class BatchingMessageHandler implements Runnable, Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage> {
    private final Log log;
    private final BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue;
    private final int maxBatch;
    private final List<RaftMessages.StoreIdAwareMessage> batch;
    private Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage> handler;

    public BatchingMessageHandler(Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage> messageHandler, int i, int i2, LogProvider logProvider) {
        this.handler = messageHandler;
        this.log = logProvider.getLog(getClass());
        this.maxBatch = i2;
        this.batch = new ArrayList(i2);
        this.messageQueue = new ArrayBlockingQueue(i);
    }

    @Override // org.neo4j.coreedge.messaging.Inbound.MessageHandler
    public void handle(RaftMessages.StoreIdAwareMessage storeIdAwareMessage) {
        try {
            this.messageQueue.put(storeIdAwareMessage);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        RaftMessages.StoreIdAwareMessage storeIdAwareMessage = null;
        try {
            storeIdAwareMessage = this.messageQueue.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
        if (storeIdAwareMessage != null) {
            if (this.messageQueue.isEmpty()) {
                this.handler.handle(storeIdAwareMessage);
                return;
            }
            this.batch.clear();
            this.batch.add(storeIdAwareMessage);
            drain(this.messageQueue, this.batch, this.maxBatch - 1);
            collateAndHandleBatch(this.batch);
        }
    }

    private void drain(BlockingQueue<RaftMessages.StoreIdAwareMessage> blockingQueue, List<RaftMessages.StoreIdAwareMessage> list, int i) {
        ArrayList arrayList = new ArrayList();
        blockingQueue.drainTo(arrayList, i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            list.add((RaftMessages.StoreIdAwareMessage) it.next());
        }
    }

    private void collateAndHandleBatch(List<RaftMessages.StoreIdAwareMessage> list) {
        RaftMessages.NewEntry.BatchRequest batchRequest = null;
        StoreId storeId = list.get(0).storeId();
        for (RaftMessages.StoreIdAwareMessage storeIdAwareMessage : list) {
            if (batchRequest != null && !storeIdAwareMessage.storeId().equals(storeId)) {
                this.handler.handle(new RaftMessages.StoreIdAwareMessage(storeId, batchRequest));
                batchRequest = null;
            }
            storeId = storeIdAwareMessage.storeId();
            RaftMessages.RaftMessage message = storeIdAwareMessage.message();
            if (message instanceof RaftMessages.NewEntry.Request) {
                RaftMessages.NewEntry.Request request = (RaftMessages.NewEntry.Request) message;
                if (batchRequest == null) {
                    batchRequest = new RaftMessages.NewEntry.BatchRequest(list.size());
                }
                batchRequest.add(request.content());
            } else {
                this.handler.handle(storeIdAwareMessage);
            }
        }
        if (batchRequest != null) {
            this.handler.handle(new RaftMessages.StoreIdAwareMessage(storeId, batchRequest));
        }
    }
}
