package org.neo4j.graphalgo.beta.pregel;

import java.util.LinkedList;
import java.util.Queue;
import java.util.stream.LongStream;
import org.jctools.queues.MpscLinkedQueue;
import org.jetbrains.annotations.Nullable;
import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.beta.pregel.Messages;
import org.neo4j.graphalgo.config.GraphCreateConfig;
import org.neo4j.graphalgo.core.concurrency.ParallelUtil;
import org.neo4j.graphalgo.core.utils.mem.AllocationTracker;
import org.neo4j.graphalgo.core.utils.mem.MemoryEstimation;
import org.neo4j.graphalgo.core.utils.mem.MemoryEstimations;
import org.neo4j.graphalgo.core.utils.mem.MemoryUsage;
import org.neo4j.graphalgo.core.utils.paged.HugeObjectArray;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/QueueMessenger.class */
public class QueueMessenger implements Messenger<QueueIterator> {
    private static final Double TERMINATION_SYMBOL = Double.valueOf(Double.NaN);
    private static final Queue<Double> EMPTY_QUEUE = new LinkedList();
    private final Graph graph;
    private final PregelConfig config;
    private final HugeObjectArray<MpscLinkedQueue<Double>> messageQueues;

    /* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/QueueMessenger$QueueIterator.class */
    public static abstract class QueueIterator implements Messages.MessageIterator {
        Queue<Double> queue;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/QueueMessenger$QueueIterator$Async.class */
        public static class Async extends QueueIterator {
            Async() {
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.queue.peek() != null;
            }

            @Override // org.neo4j.graphalgo.beta.pregel.QueueMessenger.QueueIterator, org.neo4j.graphalgo.beta.pregel.Messages.MessageIterator
            public boolean isEmpty() {
                return this.queue.isEmpty();
            }

            @Override // org.neo4j.graphalgo.beta.pregel.QueueMessenger.QueueIterator, java.util.Iterator
            public /* bridge */ /* synthetic */ Double next() {
                return super.next();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/QueueMessenger$QueueIterator$Sync.class */
        public static class Sync extends QueueIterator {
            private boolean reachedEnd = false;

            Sync() {
            }

            @Override // org.neo4j.graphalgo.beta.pregel.QueueMessenger.QueueIterator
            void init(@Nullable Queue<Double> queue) {
                super.init(queue);
                this.reachedEnd = false;
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.reachedEnd || this.queue.isEmpty()) {
                    return false;
                }
                if (!Double.isNaN(this.queue.peek().doubleValue())) {
                    return true;
                }
                this.queue.poll();
                this.reachedEnd = true;
                return false;
            }

            @Override // org.neo4j.graphalgo.beta.pregel.QueueMessenger.QueueIterator, org.neo4j.graphalgo.beta.pregel.Messages.MessageIterator
            public boolean isEmpty() {
                return this.queue.isEmpty() || this.reachedEnd || Double.isNaN(this.queue.peek().doubleValue());
            }

            @Override // org.neo4j.graphalgo.beta.pregel.QueueMessenger.QueueIterator, java.util.Iterator
            public /* bridge */ /* synthetic */ Double next() {
                return super.next();
            }
        }

        void init(@Nullable Queue<Double> queue) {
            this.queue = queue;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Double next() {
            return this.queue.poll();
        }

        @Override // org.neo4j.graphalgo.beta.pregel.Messages.MessageIterator
        public abstract boolean isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueMessenger(Graph graph, PregelConfig pregelConfig, AllocationTracker allocationTracker) {
        this.graph = graph;
        this.config = pregelConfig;
        this.messageQueues = initQueues(allocationTracker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MemoryEstimation memoryEstimation() {
        return MemoryEstimations.setup(GraphCreateConfig.IMPLICIT_GRAPH_NAME, (graphDimensions, i) -> {
            return MemoryEstimations.builder((Class<?>) QueueMessenger.class).fixed(HugeObjectArray.class.getSimpleName(), MemoryUsage.sizeOfInstance(HugeObjectArray.class)).perNode("node queue", MemoryEstimations.builder((Class<?>) MpscLinkedQueue.class).fixed("messages", graphDimensions.averageDegree() * 8).build()).build();
        });
    }

    private HugeObjectArray<MpscLinkedQueue<Double>> initQueues(AllocationTracker allocationTracker) {
        HugeObjectArray<MpscLinkedQueue<Double>> newArray = HugeObjectArray.newArray(new MpscLinkedQueue().getClass(), this.graph.nodeCount(), allocationTracker);
        ParallelUtil.parallelStreamConsume(LongStream.range(0L, this.graph.nodeCount()), this.config.concurrency(), longStream -> {
            longStream.forEach(j -> {
                newArray.set(j, new MpscLinkedQueue());
            });
        });
        return newArray;
    }

    @Override // org.neo4j.graphalgo.beta.pregel.Messenger
    public void initIteration(int i) {
        if (this.config.isAsynchronous() || i <= 0) {
            return;
        }
        ParallelUtil.parallelStreamConsume(LongStream.range(0L, this.graph.nodeCount()), this.config.concurrency(), longStream -> {
            longStream.forEach(j -> {
                MpscLinkedQueue<Double> mpscLinkedQueue = this.messageQueues.get(j);
                if (mpscLinkedQueue.isEmpty()) {
                    return;
                }
                mpscLinkedQueue.add(TERMINATION_SYMBOL);
            });
        });
    }

    @Override // org.neo4j.graphalgo.beta.pregel.Messenger
    public void sendTo(long j, double d) {
        this.messageQueues.get(j).add(Double.valueOf(d));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.neo4j.graphalgo.beta.pregel.Messenger
    public QueueIterator messageIterator() {
        return this.config.isAsynchronous() ? new QueueIterator.Async() : new QueueIterator.Sync();
    }

    @Override // org.neo4j.graphalgo.beta.pregel.Messenger
    public void initMessageIterator(QueueIterator queueIterator, long j, boolean z) {
        queueIterator.init(z ? EMPTY_QUEUE : (Queue) this.messageQueues.get(j));
    }

    @Override // org.neo4j.graphalgo.beta.pregel.Messenger
    public void release() {
        this.messageQueues.release();
    }
}
