package com.facebook.presto.spark.classloader_interface;

import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.Partition;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.ShuffledRDD;
import org.apache.spark.rdd.ShuffledRDDPartition;
import org.apache.spark.rdd.ZippedPartitionsPartition;
import org.apache.spark.shuffle.ShuffleHandle;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManagerId;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/* loaded from: input_file:com/facebook/presto/spark/classloader_interface/PrestoSparkNativeTaskRdd.class */
public class PrestoSparkNativeTaskRdd<T extends PrestoSparkTaskOutput> extends PrestoSparkTaskRdd<T> {
    public static <T extends PrestoSparkTaskOutput> PrestoSparkNativeTaskRdd<T> create(SparkContext sparkContext, Optional<PrestoSparkTaskSourceRdd> optional, Map<String, RDD<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>> map, PrestoSparkTaskProcessor<T> prestoSparkTaskProcessor) {
        Objects.requireNonNull(sparkContext, "context is null");
        Objects.requireNonNull(optional, "taskSourceRdd is null");
        Objects.requireNonNull(map, "shuffleInputRddMap is null");
        Objects.requireNonNull(prestoSparkTaskProcessor, "taskProcessor is null");
        ImmutableList.Builder builder = ImmutableList.builder();
        ImmutableList.Builder builder2 = ImmutableList.builder();
        for (Map.Entry<String, RDD<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>> entry : map.entrySet()) {
            builder.add(entry.getKey());
            builder2.add(entry.getValue());
        }
        return new PrestoSparkNativeTaskRdd<>(sparkContext, optional, builder.build(), builder2.build(), prestoSparkTaskProcessor);
    }

    @Override // com.facebook.presto.spark.classloader_interface.PrestoSparkTaskRdd
    public Iterator<Tuple2<MutablePartitionId, T>> compute(Partition partition, TaskContext taskContext) {
        PrestoSparkTaskSourceRdd taskSourceRdd = getTaskSourceRdd();
        List<Partition> seqAsJavaList = JavaConversions.seqAsJavaList(((ZippedPartitionsPartition) partition).partitions());
        int size = (taskSourceRdd != null ? 1 : 0) + getShuffleInputRdds().size();
        Preconditions.checkState(seqAsJavaList.size() == size, String.format("Unexpected partitions size. Expected: %s. Actual: %s.", Integer.valueOf(size), Integer.valueOf(seqAsJavaList.size())));
        return getTaskProcessor().process(taskSourceRdd != null ? taskSourceRdd.iterator(seqAsJavaList.get(seqAsJavaList.size() - 1), taskContext) : ScalaUtils.emptyScalaIterator(), getShuffleReadDescriptors(seqAsJavaList), getShuffleWriteDescriptor(partition));
    }

    private PrestoSparkNativeTaskRdd(SparkContext sparkContext, Optional<PrestoSparkTaskSourceRdd> optional, List<String> list, List<RDD<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>> list2, PrestoSparkTaskProcessor<T> prestoSparkTaskProcessor) {
        super(sparkContext, optional, list, list2, prestoSparkTaskProcessor);
    }

    private Map<String, PrestoSparkShuffleReadDescriptor> getShuffleReadDescriptors(List<Partition> list) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        int size = list.size();
        List<RDD<Tuple2<MutablePartitionId, PrestoSparkMutableRow>>> shuffleInputRdds = getShuffleInputRdds();
        List<String> shuffleInputFragmentIds = getShuffleInputFragmentIds();
        Preconditions.checkState(size >= shuffleInputRdds.size() && size >= shuffleInputFragmentIds.size(), String.format("Size of shuffleInputRdds %d or shuffleInputFragmentIds %d is not equal to number of partitions %d", Integer.valueOf(shuffleInputRdds.size()), Integer.valueOf(shuffleInputFragmentIds.size()), Integer.valueOf(size)));
        for (int i = 0; i < shuffleInputRdds.size(); i++) {
            ShuffledRDDPartition shuffledRDDPartition = (Partition) list.get(i);
            Preconditions.checkState(shuffledRDDPartition != null);
            Preconditions.checkState(shuffledRDDPartition instanceof ShuffledRDDPartition, "partition is required to be ShuffledRddPartition, but got: %s", shuffledRDDPartition.getClass().getName());
            RDD<Tuple2<MutablePartitionId, PrestoSparkMutableRow>> rdd = shuffleInputRdds.get(i);
            Preconditions.checkState(rdd != null);
            Preconditions.checkState(rdd instanceof ShuffledRDD, "ShuffledRdd is required but got: %s", rdd.getClass().getName());
            ShuffleHandle shuffleHandle = ((ShuffleDependency) rdd.dependencies().head()).shuffleHandle();
            builder.put(shuffleInputFragmentIds.get(i), new PrestoSparkShuffleReadDescriptor(shuffledRDDPartition, shuffleHandle, rdd.getNumPartitions(), getBlockIds(shuffledRDDPartition, shuffleHandle), getPartitionIds(shuffledRDDPartition, shuffleHandle), getPartitionSize(shuffledRDDPartition, shuffleHandle)));
        }
        return builder.build();
    }

    private Optional<PrestoSparkShuffleWriteDescriptor> getShuffleWriteDescriptor(Partition partition) {
        Preconditions.checkState(SparkEnv.get().shuffleManager() instanceof PrestoSparkNativeExecutionShuffleManager, "Native execution requires to use PrestoSparkNativeExecutionShuffleManager. But got: %s", SparkEnv.get().shuffleManager().getClass().getName());
        PrestoSparkNativeExecutionShuffleManager prestoSparkNativeExecutionShuffleManager = (PrestoSparkNativeExecutionShuffleManager) SparkEnv.get().shuffleManager();
        return prestoSparkNativeExecutionShuffleManager.getShuffleHandle(partition.index()).map(shuffleHandle -> {
            return new PrestoSparkShuffleWriteDescriptor(shuffleHandle, prestoSparkNativeExecutionShuffleManager.getNumOfPartitions(shuffleHandle.shuffleId()));
        });
    }

    private List<String> getBlockIds(ShuffledRDDPartition shuffledRDDPartition, ShuffleHandle shuffleHandle) {
        return (List) JavaConversions.asJavaCollection(SparkEnv.get().mapOutputTracker().getMapSizesByExecutorId(shuffleHandle.shuffleId(), shuffledRDDPartition.idx(), shuffledRDDPartition.idx() + 1)).stream().map(tuple2 -> {
            return ((BlockManagerId) tuple2._1).executorId();
        }).collect(Collectors.toList());
    }

    private List<String> getPartitionIds(ShuffledRDDPartition shuffledRDDPartition, ShuffleHandle shuffleHandle) {
        return (List) JavaConversions.asJavaCollection(SparkEnv.get().mapOutputTracker().getMapSizesByExecutorId(shuffleHandle.shuffleId(), shuffledRDDPartition.idx(), shuffledRDDPartition.idx() + 1)).stream().map(tuple2 -> {
            return JavaConversions.asJavaCollection((Iterable) tuple2._2);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map(tuple22 -> {
            return ((BlockId) tuple22._1).toString();
        }).collect(Collectors.toList());
    }

    private List<Long> getPartitionSize(ShuffledRDDPartition shuffledRDDPartition, ShuffleHandle shuffleHandle) {
        return (List) JavaConversions.asJavaCollection(SparkEnv.get().mapOutputTracker().getMapSizesByExecutorId(shuffleHandle.shuffleId(), shuffledRDDPartition.idx(), shuffledRDDPartition.idx() + 1)).stream().map(tuple2 -> {
            return (Long) JavaConversions.seqAsJavaList((Seq) tuple2._2).stream().map(tuple2 -> {
                return (Long) tuple2._2;
            }).reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
        }).collect(Collectors.toList());
    }
}
