package org.gradoop.flink.model.impl.operators.split;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.Function;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphCollectionOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.InitGraphHead;
import org.gradoop.flink.model.impl.functions.epgm.PairTupleWithNewId;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.tuple.Project2To1;
import org.gradoop.flink.model.impl.operators.split.functions.AddNewGraphsToEdge;
import org.gradoop.flink.model.impl.operators.split.functions.AddNewGraphsToVertex;
import org.gradoop.flink.model.impl.operators.split.functions.JoinEdgeTupleWithSourceGraphs;
import org.gradoop.flink.model.impl.operators.split.functions.JoinEdgeTupleWithTargetGraphs;
import org.gradoop.flink.model.impl.operators.split.functions.JoinVertexIdWithGraphIds;
import org.gradoop.flink.model.impl.operators.split.functions.MultipleGraphIdsGroupReducer;
import org.gradoop.flink.model.impl.operators.split.functions.SplitValues;

/* loaded from: input_file:org/gradoop/flink/model/impl/operators/split/Split.class */
public class Split<G extends GraphHead, V extends Vertex, E extends Edge, LG extends BaseGraph<G, V, E, LG, GC>, GC extends BaseGraphCollection<G, V, E, LG, GC>> implements UnaryBaseGraphToBaseGraphCollectionOperator<LG, GC>, Serializable {
    private final Function<V, List<PropertyValue>> function;

    public Split(Function<V, List<PropertyValue>> function) {
        this.function = function;
    }

    @Override // org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator
    public GC execute(LG lg) {
        Operator flatMap = lg.getVertices().flatMap(new SplitValues(this.function));
        MapOperator map = flatMap.map(new Project2To1()).distinct().map(new PairTupleWithNewId());
        GroupReduceOperator reduceGroup = flatMap.join(map).where(1).equalTo(0).with((JoinFunction) new JoinVertexIdWithGraphIds()).groupBy(0).reduceGroup(new MultipleGraphIdsGroupReducer());
        return lg.getCollectionFactory().fromDataSets(map.map(new Project2To1()).map(new InitGraphHead(lg.getFactory().getGraphHeadFactory())), lg.getVertices().join(reduceGroup).where((KeySelector) new Id()).equalTo(0).with((JoinFunction) new AddNewGraphsToVertex()), lg.getEdges().join(reduceGroup).where((KeySelector) new SourceId()).equalTo(0).with((JoinFunction) new JoinEdgeTupleWithSourceGraphs()).join(reduceGroup).where("f0.targetId").equalTo(0).with((JoinFunction) new JoinEdgeTupleWithTargetGraphs()).flatMap(new AddNewGraphsToEdge()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator
    public /* bridge */ /* synthetic */ Object execute(BaseGraph baseGraph) {
        return execute((Split<G, V, E, LG, GC>) baseGraph);
    }
}
