/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.function.Predicates;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

class BatchingMessageHandler
implements Runnable,
LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> {
    private final LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> handler;
    private final Log log;
    private final int maxBatch;
    private final List<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> batch;
    private final BlockingQueue<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> messageQueue;
    private final ContinuousJob job;
    private final ContentHandler contentHandler = new ContentHandler();
    private volatile boolean stopped;

    BatchingMessageHandler(LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> handler, int queueSize, int maxBatch, Function<Runnable, ContinuousJob> jobSchedulerFactory, LogProvider logProvider) {
        this.handler = handler;
        this.log = logProvider.getLog(this.getClass());
        this.maxBatch = maxBatch;
        this.batch = new ArrayList(maxBatch);
        this.messageQueue = new ArrayBlockingQueue(queueSize);
        this.job = jobSchedulerFactory.apply(this);
    }

    static ComposableMessageHandler composable(int queueSize, int maxBatch, Function<Runnable, ContinuousJob> jobSchedulerFactory, LogProvider logProvider) {
        return delegate -> new BatchingMessageHandler(delegate, queueSize, maxBatch, jobSchedulerFactory, logProvider);
    }

    @Override
    public void start(ClusterId clusterId) throws Throwable {
        this.handler.start(clusterId);
        this.job.start();
    }

    @Override
    public void stop() throws Throwable {
        this.stopped = true;
        this.handler.stop();
        this.job.stop();
    }

    @Override
    public void handle(RaftMessages.ReceivedInstantClusterIdAwareMessage<?> message) {
        if (this.stopped) {
            this.log.debug("This handler has been stopped, dropping the message: %s", new Object[]{message});
            return;
        }
        Predicates.awaitForever(() -> this.stopped || this.messageQueue.offer(message), (long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        RaftMessages.ReceivedInstantClusterIdAwareMessage<?> message = null;
        try {
            message = this.messageQueue.poll(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.log.warn("Not expecting to be interrupted.", (Throwable)e);
        }
        if (message != null) {
            if (this.messageQueue.isEmpty()) {
                this.handler.handle(message);
            } else {
                this.batch.clear();
                this.batch.add(message);
                this.drain(this.messageQueue, this.batch, this.maxBatch - 1);
                this.collateAndHandleBatch(this.batch);
            }
        }
    }

    private void drain(BlockingQueue<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> messageQueue, List<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> batch, int maxElements) {
        ArrayList tempDraining = new ArrayList();
        messageQueue.drainTo(tempDraining, maxElements);
        batch.addAll(tempDraining);
    }

    private void collateAndHandleBatch(List<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> batch) {
        RaftMessages.ReceivedInstantClusterIdAwareMessage<RaftMessages.NewEntry.BatchRequest> batchRequest = null;
        for (RaftMessages.ReceivedInstantClusterIdAwareMessage<?> message : batch) {
            ReplicatedContent replicatedContent;
            if (batchRequest != null && !message.clusterId().equals(batchRequest.clusterId())) {
                this.handler.handle(batchRequest);
                batchRequest = null;
            }
            if ((replicatedContent = message.dispatch(this.contentHandler)) != null) {
                if (batchRequest == null) {
                    batchRequest = RaftMessages.ReceivedInstantClusterIdAwareMessage.of(message.receivedAt(), message.clusterId(), new RaftMessages.NewEntry.BatchRequest(batch.size()));
                }
                ((RaftMessages.NewEntry.BatchRequest)batchRequest.message()).add(replicatedContent);
                continue;
            }
            this.handler.handle(message);
        }
        if (batchRequest != null) {
            this.handler.handle(batchRequest);
        }
    }

    class ContentHandler
    implements RaftMessages.Handler<ReplicatedContent, RuntimeException> {
        ContentHandler() {
        }

        @Override
        public ReplicatedContent handle(RaftMessages.NewEntry.Request request) throws RuntimeException {
            return request.content();
        }

        @Override
        public ReplicatedContent handle(RaftMessages.NewEntry.BatchRequest batchRequest) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.Vote.Request request) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.Vote.Response response) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.PreVote.Request request) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.PreVote.Response response) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.AppendEntries.Request request) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.AppendEntries.Response response) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.Heartbeat heartbeat) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.LogCompactionInfo logCompactionInfo) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.HeartbeatResponse heartbeatResponse) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.Timeout.Election election) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.Timeout.Heartbeat heartbeat) throws RuntimeException {
            return null;
        }

        @Override
        public ReplicatedContent handle(RaftMessages.PruneRequest pruneRequest) throws RuntimeException {
            return null;
        }
    }
}

