package org.neo4j.graphalgo.beta.pregel;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.immutables.value.Value;
import org.neo4j.graphalgo.annotation.ValueClass;
import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.beta.pregel.PregelConfig;
import org.neo4j.graphalgo.beta.pregel.context.MasterComputeContext;
import org.neo4j.graphalgo.core.concurrency.ParallelUtil;
import org.neo4j.graphalgo.core.utils.mem.AllocationTracker;
import org.neo4j.graphalgo.core.utils.mem.MemoryEstimation;
import org.neo4j.graphalgo.core.utils.mem.MemoryEstimations;
import org.neo4j.graphalgo.core.utils.mem.MemoryUsage;
import org.neo4j.graphalgo.core.utils.paged.HugeAtomicBitSet;
import org.neo4j.graphalgo.core.utils.partition.Partition;
import org.neo4j.graphalgo.core.utils.partition.PartitionUtils;

@Value.Style(builderVisibility = Value.Style.BuilderVisibility.PUBLIC, depluralize = true, deepImmutablesDetection = true)
/* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/Pregel.class */
public final class Pregel<CONFIG extends PregelConfig> {
    private final CONFIG config;
    private final PregelComputation<CONFIG> computation;
    private final Graph graph;
    private final NodeValue nodeValues;
    private final Messenger<?> messenger;
    private final int concurrency;
    private final ExecutorService executor;
    private final AllocationTracker tracker;

    @ValueClass
    /* loaded from: input_file:org/neo4j/graphalgo/beta/pregel/Pregel$PregelResult.class */
    public interface PregelResult {
        NodeValue nodeValues();

        int ranIterations();

        boolean didConverge();
    }

    public static <CONFIG extends PregelConfig> Pregel<CONFIG> create(Graph graph, CONFIG config, PregelComputation<CONFIG> pregelComputation, ExecutorService executorService, AllocationTracker allocationTracker) {
        ImmutablePregelConfig.copyOf(config);
        return new Pregel<>(graph, config, pregelComputation, NodeValue.of(pregelComputation.schema(), graph.nodeCount(), config.concurrency(), allocationTracker), executorService, allocationTracker);
    }

    public static MemoryEstimation memoryEstimation(PregelSchema pregelSchema, boolean z) {
        MemoryEstimations.Builder add = MemoryEstimations.builder((Class<?>) Pregel.class).perNode("vote bits", MemoryUsage::sizeOfHugeAtomicBitset).perThread("compute steps", MemoryEstimations.builder((Class<?>) ComputeStep.class).build()).add("node value", NodeValue.memoryEstimation(pregelSchema));
        if (z) {
            add.add("message queues", QueueMessenger.memoryEstimation());
        } else {
            add.add("message arrays", ReducingMessenger.memoryEstimation());
        }
        return add.build();
    }

    private Pregel(Graph graph, CONFIG config, PregelComputation<CONFIG> pregelComputation, NodeValue nodeValue, ExecutorService executorService, AllocationTracker allocationTracker) {
        this.graph = graph;
        this.config = config;
        this.computation = pregelComputation;
        this.nodeValues = nodeValue;
        this.concurrency = config.concurrency();
        this.executor = executorService;
        this.tracker = allocationTracker;
        Optional<Reducer> reducer = pregelComputation.reducer();
        this.messenger = reducer.isPresent() ? new ReducingMessenger(graph, config, reducer.get(), allocationTracker) : new QueueMessenger(graph, config, allocationTracker);
    }

    public PregelResult run() {
        boolean z = false;
        HugeAtomicBitSet create = HugeAtomicBitSet.create(this.graph.nodeCount(), this.tracker);
        List<ComputeStep<CONFIG, ?>> createComputeSteps = createComputeSteps(create);
        int i = 0;
        while (true) {
            if (i >= this.config.maxIterations()) {
                break;
            }
            Iterator<ComputeStep<CONFIG, ?>> it = createComputeSteps.iterator();
            while (it.hasNext()) {
                it.next().init(i);
            }
            this.messenger.initIteration(i);
            runComputeSteps(createComputeSteps);
            runMasterComputeStep(i);
            if (!createComputeSteps.stream().anyMatch((v0) -> {
                return v0.hasSendMessage();
            }) && create.allSet()) {
                z = true;
                break;
            }
            i++;
        }
        return ImmutablePregelResult.builder().nodeValues(this.nodeValues).didConverge(z).ranIterations(i).build();
    }

    public void release() {
        this.messenger.release();
    }

    private List<ComputeStep<CONFIG, ?>> createComputeSteps(HugeAtomicBitSet hugeAtomicBitSet) {
        List<Partition> rangePartition = PartitionUtils.rangePartition(this.concurrency, this.graph.nodeCount());
        ArrayList arrayList = new ArrayList(this.concurrency);
        Iterator<Partition> it = rangePartition.iterator();
        while (it.hasNext()) {
            arrayList.add(new ComputeStep(this.graph, this.computation, this.config, 0, it.next(), this.nodeValues, this.messenger, hugeAtomicBitSet, this.graph));
        }
        return arrayList;
    }

    private void runComputeSteps(Collection<ComputeStep<CONFIG, ?>> collection) {
        ParallelUtil.runWithConcurrency(this.concurrency, collection, this.executor);
    }

    private void runMasterComputeStep(int i) {
        this.computation.masterCompute(new MasterComputeContext<>(this.config, this.graph, i, this.nodeValues));
    }
}
