package org.neo4j.coreedge.raft;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.MismatchedStoreIdService;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/raft/BatchingMessageHandler.class */
public class BatchingMessageHandler implements Runnable, Inbound.MessageHandler<RaftMessages.StoreIdAwareMessage>, MismatchedStoreIdService {
    private final Log log;
    private final Inbound.MessageHandler<RaftMessages.RaftMessage> innerHandler;
    private final BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue;
    private final int maxBatch;
    private final List<RaftMessages.RaftMessage> batch;
    private final LocalDatabase localDatabase;
    private RaftStateMachine raftStateMachine;
    private final List<MismatchedStoreIdService.MismatchedStoreListener> listeners = new ArrayList();

    public BatchingMessageHandler(Inbound.MessageHandler<RaftMessages.RaftMessage> messageHandler, LogProvider logProvider, int i, int i2, LocalDatabase localDatabase, RaftStateMachine raftStateMachine) {
        this.innerHandler = messageHandler;
        this.localDatabase = localDatabase;
        this.raftStateMachine = raftStateMachine;
        this.log = logProvider.getLog(getClass());
        this.maxBatch = i2;
        this.batch = new ArrayList(i2);
        this.messageQueue = new ArrayBlockingQueue(i);
    }

    @Override // org.neo4j.coreedge.raft.net.Inbound.MessageHandler
    public void handle(RaftMessages.StoreIdAwareMessage storeIdAwareMessage) {
        try {
            this.messageQueue.put(storeIdAwareMessage);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        RaftMessages.StoreIdAwareMessage storeIdAwareMessage = null;
        try {
            storeIdAwareMessage = this.messageQueue.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
        if (storeIdAwareMessage != null) {
            RaftMessages.RaftMessage message = storeIdAwareMessage.message();
            StoreId storeId = storeIdAwareMessage.storeId();
            if (!storeIdAwareMessage.storeId().equals(this.localDatabase.storeId())) {
                if (this.localDatabase.isEmpty()) {
                    this.log.info("StoreId mismatch but store was empty so downloading new store from %s. Expected: %s, Encountered: %s. ", new Object[]{message.from(), storeId, this.localDatabase.storeId()});
                    this.raftStateMachine.downloadSnapshot(message.from());
                    return;
                } else {
                    this.log.info("Discarding message[%s] owing to mismatched storeId and non-empty store. Expected: %s, Encountered: %s", new Object[]{message, storeId, this.localDatabase.storeId()});
                    this.listeners.forEach(mismatchedStoreListener -> {
                        mismatchedStoreListener.onMismatchedStore(new MismatchedStoreIdService.MismatchedStoreIdException(storeId, this.localDatabase.storeId()));
                    });
                    return;
                }
            }
            if (this.messageQueue.isEmpty()) {
                this.innerHandler.handle(storeIdAwareMessage.message());
                return;
            }
            this.batch.clear();
            this.batch.add(message);
            drain(this.messageQueue, this.batch, this.maxBatch - 1);
            collateAndHandleBatch(this.batch);
        }
    }

    private void drain(BlockingQueue<RaftMessages.StoreIdAwareMessage> blockingQueue, List<RaftMessages.RaftMessage> list, int i) {
        ArrayList arrayList = new ArrayList();
        blockingQueue.drainTo(arrayList, i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            list.add(((RaftMessages.StoreIdAwareMessage) it.next()).message());
        }
    }

    @Override // org.neo4j.coreedge.raft.MismatchedStoreIdService
    public void addMismatchedStoreListener(MismatchedStoreIdService.MismatchedStoreListener mismatchedStoreListener) {
        this.listeners.add(mismatchedStoreListener);
    }

    private void collateAndHandleBatch(List<RaftMessages.RaftMessage> list) {
        RaftMessages.NewEntry.Batch batch = null;
        for (RaftMessages.RaftMessage raftMessage : list) {
            if (raftMessage instanceof RaftMessages.NewEntry.Request) {
                RaftMessages.NewEntry.Request request = (RaftMessages.NewEntry.Request) raftMessage;
                if (batch == null) {
                    batch = new RaftMessages.NewEntry.Batch(list.size());
                }
                batch.add(request.content());
            } else {
                this.innerHandler.handle(raftMessage);
            }
        }
        if (batch != null) {
            this.innerHandler.handle(batch);
        }
    }
}
