package org.neo4j.causalclustering.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.neo4j.causalclustering.core.BoundedPriorityQueue;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandler.class */
public class BatchingMessageHandler implements Runnable, LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> {
    private final LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> handler;
    private final Log log;
    private final BoundedPriorityQueue<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> inQueue;
    private final ContinuousJob job;
    private final List<ReplicatedContent> contentBatch;
    private final List<RaftLogEntry> entryBatch;
    private final Config batchConfig;
    private volatile boolean stopped;
    private volatile BoundedPriorityQueue.Result lastResult = BoundedPriorityQueue.Result.OK;
    private AtomicLong droppedCount = new AtomicLong();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandler$BatchingHandler.class */
    private class BatchingHandler extends RaftMessages.HandlerAdaptor<RaftMessages.ReceivedInstantClusterIdAwareMessage, RuntimeException> {
        private final RaftMessages.ReceivedInstantClusterIdAwareMessage<?> baseMessage;

        BatchingHandler(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage) {
            this.baseMessage = receivedInstantClusterIdAwareMessage;
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public RaftMessages.ReceivedInstantClusterIdAwareMessage handle(RaftMessages.NewEntry.Request request) throws RuntimeException {
            return RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.baseMessage.receivedAt(), this.baseMessage.clusterId(), BatchingMessageHandler.this.batchNewEntries(request));
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public RaftMessages.ReceivedInstantClusterIdAwareMessage handle(RaftMessages.AppendEntries.Request request) throws RuntimeException {
            if (request.entries().length == 0) {
                return null;
            }
            return RaftMessages.ReceivedInstantClusterIdAwareMessage.of(this.baseMessage.receivedAt(), this.baseMessage.clusterId(), BatchingMessageHandler.this.batchAppendEntries(request));
        }
    }

    /* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandler$Config.class */
    public static class Config {
        private final int maxBatchCount;
        private final long maxBatchBytes;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Config(int i, long j) {
            this.maxBatchCount = i;
            this.maxBatchBytes = j;
        }
    }

    /* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandler$ContentSize.class */
    private static class ContentSize extends RaftMessages.HandlerAdaptor<Long, RuntimeException> {
        private static final ContentSize INSTANCE = new ContentSize();

        private ContentSize() {
        }

        static long of(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage) {
            Long l = (Long) receivedInstantClusterIdAwareMessage.dispatch(INSTANCE);
            if (l == null) {
                return 0L;
            }
            return l.longValue();
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public Long handle(RaftMessages.NewEntry.Request request) throws RuntimeException {
            return Long.valueOf(request.content().size().orElse(0L));
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public Long handle(RaftMessages.AppendEntries.Request request) throws RuntimeException {
            long j = 0;
            for (RaftLogEntry raftLogEntry : request.entries()) {
                if (raftLogEntry.content().size().isPresent()) {
                    j += raftLogEntry.content().size().getAsLong();
                }
            }
            return Long.valueOf(j);
        }
    }

    /* loaded from: input_file:org/neo4j/causalclustering/core/BatchingMessageHandler$MessagePriority.class */
    private class MessagePriority extends RaftMessages.HandlerAdaptor<Integer, RuntimeException> implements Comparator<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> {
        private final Integer BASE_PRIORITY;

        private MessagePriority() {
            this.BASE_PRIORITY = 10;
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public Integer handle(RaftMessages.AppendEntries.Request request) {
            return Integer.valueOf(request.entries().length == 0 ? this.BASE_PRIORITY.intValue() : 20);
        }

        @Override // org.neo4j.causalclustering.core.consensus.RaftMessages.HandlerAdaptor, org.neo4j.causalclustering.core.consensus.RaftMessages.Handler
        public Integer handle(RaftMessages.NewEntry.Request request) {
            return 30;
        }

        @Override // java.util.Comparator
        public int compare(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage, RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage2) {
            return Integer.compare(getPriority(receivedInstantClusterIdAwareMessage), getPriority(receivedInstantClusterIdAwareMessage2));
        }

        private int getPriority(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage) {
            Integer num = (Integer) receivedInstantClusterIdAwareMessage.dispatch(this);
            return (num == null ? this.BASE_PRIORITY : num).intValue();
        }
    }

    BatchingMessageHandler(LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> lifecycleMessageHandler, BoundedPriorityQueue.Config config, Config config2, Function<Runnable, ContinuousJob> function, LogProvider logProvider) {
        this.handler = lifecycleMessageHandler;
        this.log = logProvider.getLog(getClass());
        this.batchConfig = config2;
        this.contentBatch = new ArrayList(config2.maxBatchCount);
        this.entryBatch = new ArrayList(config2.maxBatchCount);
        this.inQueue = new BoundedPriorityQueue<>(config, ContentSize::of, new MessagePriority());
        this.job = function.apply(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ComposableMessageHandler composable(BoundedPriorityQueue.Config config, Config config2, Function<Runnable, ContinuousJob> function, LogProvider logProvider) {
        return lifecycleMessageHandler -> {
            return new BatchingMessageHandler(lifecycleMessageHandler, config, config2, function, logProvider);
        };
    }

    @Override // org.neo4j.causalclustering.messaging.LifecycleMessageHandler
    public void start(ClusterId clusterId) throws Throwable {
        this.handler.start(clusterId);
        this.job.start();
    }

    @Override // org.neo4j.causalclustering.messaging.LifecycleMessageHandler
    public void stop() throws Throwable {
        this.stopped = true;
        this.handler.stop();
        this.job.stop();
    }

    @Override // org.neo4j.causalclustering.messaging.Inbound.MessageHandler
    public void handle(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage) {
        if (this.stopped) {
            this.log.debug("This handler has been stopped, dropping the message: %s", new Object[]{receivedInstantClusterIdAwareMessage});
        } else {
            logQueueState(this.inQueue.offer(receivedInstantClusterIdAwareMessage));
        }
    }

    private void logQueueState(BoundedPriorityQueue.Result result) {
        if (result != BoundedPriorityQueue.Result.OK) {
            this.droppedCount.incrementAndGet();
        }
        if (result != this.lastResult) {
            if (result == BoundedPriorityQueue.Result.OK) {
                this.log.info("Raft in-queue not dropping messages anymore. Dropped %d messages.", new Object[]{Long.valueOf(this.droppedCount.getAndSet(0L))});
            } else {
                this.log.warn("Raft in-queue dropping messages after: " + result);
            }
            this.lastResult = result;
        }
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [org.neo4j.causalclustering.core.consensus.RaftMessages$RaftMessage] */
    @Override // java.lang.Runnable
    public void run() {
        try {
            Optional<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> poll = this.inQueue.poll(1, TimeUnit.SECONDS);
            if (poll.isPresent()) {
                RaftMessages.ReceivedInstantClusterIdAwareMessage<?> receivedInstantClusterIdAwareMessage = (RaftMessages.ReceivedInstantClusterIdAwareMessage) poll.get().message().dispatch(new BatchingHandler(poll.get()));
                this.handler.handle(receivedInstantClusterIdAwareMessage == null ? poll.get() : receivedInstantClusterIdAwareMessage);
            }
        } catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RaftMessages.NewEntry.BatchRequest batchNewEntries(RaftMessages.NewEntry.Request request) {
        this.contentBatch.clear();
        this.contentBatch.add(request.content());
        long orElse = request.content().size().orElse(0L);
        while (this.contentBatch.size() < this.batchConfig.maxBatchCount) {
            Optional peekNext = peekNext(RaftMessages.NewEntry.Request.class);
            if (!peekNext.isPresent()) {
                break;
            }
            ReplicatedContent content = ((RaftMessages.NewEntry.Request) ((BoundedPriorityQueue.Removable) peekNext.get()).get()).content();
            if (content.size().isPresent() && orElse + content.size().getAsLong() > this.batchConfig.maxBatchBytes) {
                break;
            }
            this.contentBatch.add(content);
            boolean remove = ((BoundedPriorityQueue.Removable) peekNext.get()).remove();
            if (!$assertionsDisabled && !remove) {
                throw new AssertionError();
            }
        }
        return new RaftMessages.NewEntry.BatchRequest(this.contentBatch);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RaftMessages.AppendEntries.Request batchAppendEntries(RaftMessages.AppendEntries.Request request) {
        this.entryBatch.clear();
        long j = 0;
        for (RaftLogEntry raftLogEntry : request.entries()) {
            j += raftLogEntry.content().size().orElse(0L);
            this.entryBatch.add(raftLogEntry);
        }
        long leaderCommit = request.leaderCommit();
        long term = ((RaftLogEntry) ArrayUtil.lastOf(request.entries())).term();
        while (this.entryBatch.size() < this.batchConfig.maxBatchCount) {
            Optional peekNext = peekNext(RaftMessages.AppendEntries.Request.class);
            if (!peekNext.isPresent()) {
                break;
            }
            RaftMessages.AppendEntries.Request request2 = (RaftMessages.AppendEntries.Request) ((BoundedPriorityQueue.Removable) peekNext.get()).get();
            if (request2.entries().length == 0 || !consecutiveOrigin(request, request2, this.entryBatch.size())) {
                break;
            }
            if (!$assertionsDisabled && term != request2.prevLogTerm()) {
                throw new AssertionError();
            }
            RaftLogEntry[] entries = request2.entries();
            term = ((RaftLogEntry) ArrayUtil.lastOf(entries)).term();
            if (entries.length + this.entryBatch.size() > this.batchConfig.maxBatchCount) {
                break;
            }
            long sum = Arrays.stream(entries).mapToLong(raftLogEntry2 -> {
                return raftLogEntry2.content().size().orElse(0L);
            }).sum();
            if (sum > 0 && j + sum > this.batchConfig.maxBatchBytes) {
                break;
            }
            this.entryBatch.addAll(Arrays.asList(entries));
            j += sum;
            leaderCommit = Long.max(leaderCommit, request2.leaderCommit());
            boolean remove = ((BoundedPriorityQueue.Removable) peekNext.get()).remove();
            if (!$assertionsDisabled && !remove) {
                throw new AssertionError();
            }
        }
        return new RaftMessages.AppendEntries.Request(request.from(), request.leaderTerm(), request.prevLogIndex(), request.prevLogTerm(), (RaftLogEntry[]) this.entryBatch.toArray(RaftLogEntry.empty), leaderCommit);
    }

    private boolean consecutiveOrigin(RaftMessages.AppendEntries.Request request, RaftMessages.AppendEntries.Request request2, int i) {
        return request2.leaderTerm() == request.leaderTerm() && request2.prevLogIndex() == request.prevLogIndex() + ((long) i);
    }

    private <M> Optional<BoundedPriorityQueue.Removable<M>> peekNext(Class<M> cls) {
        return (Optional<BoundedPriorityQueue.Removable<M>>) this.inQueue.peek().filter(removable -> {
            return cls.isInstance(((RaftMessages.ReceivedInstantClusterIdAwareMessage) removable.get()).message());
        }).map(removable2 -> {
            return removable2.map(receivedInstantClusterIdAwareMessage -> {
                return cls.cast(receivedInstantClusterIdAwareMessage.message());
            });
        });
    }

    static {
        $assertionsDisabled = !BatchingMessageHandler.class.desiredAssertionStatus();
    }
}
