package org.gradoop.flink.algorithms.gelly.randomjump.functions;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageIterator;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

/* loaded from: input_file:org/gradoop/flink/algorithms/gelly/randomjump/functions/VCIComputeFunction.class */
public class VCIComputeFunction extends ComputeFunction<Long, VCIVertexValue, Long, NullValue> {
    public static final String START_INDICES_BROADCAST_SET = "startIndices";
    public static final String VERTEX_INDICES_BROADCAST_SET = "vertexIndices";
    public static final String VISITED_VERTICES_AGGREGATOR_NAME = "visitedVerticesAggregator";
    private final double jumpProbability;
    private final long verticesToVisit;
    private List<Long> startIndices;
    private List<Long> vertexIndices;
    private LongSumAggregator visitedVerticesAggregator = new LongSumAggregator();
    private long currentVisitedCount = 0;

    public VCIComputeFunction(double d, long j) {
        this.jumpProbability = d;
        this.verticesToVisit = j;
    }

    @Override // org.apache.flink.graph.pregel.ComputeFunction
    public void preSuperstep() {
        this.startIndices = (List) getBroadcastSet(START_INDICES_BROADCAST_SET);
        this.vertexIndices = (List) getBroadcastSet(VERTEX_INDICES_BROADCAST_SET);
        this.visitedVerticesAggregator = getIterationAggregator(VISITED_VERTICES_AGGREGATOR_NAME);
        LongValue previousIterationAggregate = getPreviousIterationAggregate(VISITED_VERTICES_AGGREGATOR_NAME);
        if (previousIterationAggregate != null) {
            this.currentVisitedCount += previousIterationAggregate.getValue();
        }
    }

    @Override // org.apache.flink.graph.pregel.ComputeFunction
    public void compute(Vertex<Long, VCIVertexValue> vertex, MessageIterator<NullValue> messageIterator) {
        if (this.currentVisitedCount < this.verticesToVisit) {
            ArrayList newArrayList = Lists.newArrayList(getEdges());
            Tuple2<VCIVertexValue, Boolean> of = Tuple2.of(vertex.getValue(), false);
            if (this.startIndices.contains(vertex.getId()) && !((VCIVertexValue) of.f0).isVisited()) {
                of = walkToRandomNeighbor(of, newArrayList);
            } else if (messageIterator.hasNext()) {
                Iterator<NullValue> it = messageIterator.iterator();
                while (it.hasNext()) {
                    it.next();
                    of = walkToRandomNeighbor(of, newArrayList);
                }
            }
            if (((Boolean) of.f1).booleanValue()) {
                setNewVertexValue(of.f0);
            }
        }
    }

    private Tuple2<VCIVertexValue, Boolean> walkToRandomNeighbor(Tuple2<VCIVertexValue, Boolean> tuple2, List<Edge<Long, Long>> list) {
        if (!((VCIVertexValue) tuple2.f0).isVisited()) {
            this.visitedVerticesAggregator.aggregate(1L);
            ((VCIVertexValue) tuple2.f0).setVisited();
            tuple2.f1 = true;
        }
        if (this.jumpProbability == 0.0d || this.jumpProbability < ThreadLocalRandom.current().nextDouble()) {
            ArrayList arrayList = new ArrayList();
            for (Edge<Long, Long> edge : list) {
                if (!((VCIVertexValue) tuple2.f0).getVisitedOutEdges().contains(edge.getValue())) {
                    arrayList.add(Tuple2.of(edge.getTarget(), edge.getValue()));
                }
            }
            if (arrayList.isEmpty()) {
                jumpToRandomVertex();
            } else {
                int nextInt = ThreadLocalRandom.current().nextInt(arrayList.size());
                Long l = (Long) ((Tuple2) arrayList.get(nextInt)).f0;
                ((VCIVertexValue) tuple2.f0).addVisitedOutEdge((Long) ((Tuple2) arrayList.get(nextInt)).f1);
                sendMessageTo(l, new NullValue());
                tuple2.f1 = true;
            }
        } else {
            jumpToRandomVertex();
        }
        return tuple2;
    }

    private void jumpToRandomVertex() {
        sendMessageTo(this.vertexIndices.get(ThreadLocalRandom.current().nextInt(this.vertexIndices.size())), new NullValue());
    }
}
