/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.comet.CometConf$;
import org.apache.comet.CometExecIterator;
import org.apache.comet.serde.OperatorOuterClass;
import org.apache.comet.serde.PartitioningOuterClass;
import org.apache.comet.serde.QueryPlanSerde$;
import org.apache.spark.Partition;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriteProcessor;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning;
import org.apache.spark.sql.catalyst.plans.physical.Partitioning;
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition$;
import org.apache.spark.sql.comet.CometExec$;
import org.apache.spark.sql.comet.CometMetricNode;
import org.apache.spark.sql.comet.CometMetricNode$;
import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter;
import org.apache.spark.sql.execution.metric.SQLShuffleWriteMetricsReporter$;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import scala.Array$;
import scala.Function1;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005md\u0001B\u0006\r\u0001mA\u0001b\n\u0001\u0003\u0002\u0003\u0006I\u0001\u000b\u0005\te\u0001\u0011\t\u0011)A\u0005g!Aq\t\u0001B\u0001B\u0003%\u0001\n\u0003\u0005[\u0001\t\u0005\t\u0015!\u0003\\\u0011\u0015y\u0006\u0001\"\u0001a\u0011\u001d9\u0007A1A\u0005\n!Da!\u001b\u0001!\u0002\u0013Y\u0006\"\u00026\u0001\t#Z\u0007\"B;\u0001\t\u00032\bbBA&\u0001\u0011\u0005\u0011Q\n\u0002\u001b\u0007>lW\r^*ik\u001a4G.Z,sSR,\u0007K]8dKN\u001cxN\u001d\u0006\u0003\u001b9\tqa\u001d5vM\u001adWM\u0003\u0002\u0010!\u0005IQ\r_3dkRLwN\u001c\u0006\u0003#I\tQaY8nKRT!a\u0005\u000b\u0002\u0007M\fHN\u0003\u0002\u0016-\u0005)1\u000f]1sW*\u0011q\u0003G\u0001\u0007CB\f7\r[3\u000b\u0003e\t1a\u001c:h\u0007\u0001\u00192\u0001\u0001\u000f\"!\tir$D\u0001\u001f\u0015\tiA#\u0003\u0002!=\t)2\u000b[;gM2,wK]5uKB\u0013xnY3tg>\u0014\bC\u0001\u0012&\u001b\u0005\u0019#B\u0001\u0013\u0011\u0003\u0015\u0019\b.[7t\u0013\t13E\u0001\u0010TQ&l7i\\7fiNCWO\u001a4mK^\u0013\u0018\u000e^3Qe>\u001cWm]:pe\u0006\u0011r.\u001e;qkR\u0004\u0016M\u001d;ji&|g.\u001b8h!\tI\u0003'D\u0001+\u0015\tYC&\u0001\u0005qQf\u001c\u0018nY1m\u0015\tic&A\u0003qY\u0006t7O\u0003\u00020%\u0005A1-\u0019;bYf\u001cH/\u0003\u00022U\ta\u0001+\u0019:uSRLwN\\5oO\u0006\u0001r.\u001e;qkR\fE\u000f\u001e:jEV$Xm\u001d\t\u0004iy\neBA\u001b<\u001d\t1\u0014(D\u00018\u0015\tA$$\u0001\u0004=e>|GOP\u0005\u0002u\u0005)1oY1mC&\u0011A(P\u0001\ba\u0006\u001c7.Y4f\u0015\u0005Q\u0014BA A\u0005\r\u0019V-\u001d\u0006\u0003yu\u0002\"AQ#\u000e\u0003\rS!\u0001\u0012\u0018\u0002\u0017\u0015D\bO]3tg&|gn]\u0005\u0003\r\u000e\u0013\u0011\"\u0011;ue&\u0014W\u000f^3\u0002\u000f5,GO]5dgB!\u0011*\u0014)T\u001d\tQ5\n\u0005\u00027{%\u0011A*P\u0001\u0007!J,G-\u001a4\n\u00059{%aA'ba*\u0011A*\u0010\t\u0003\u0013FK!AU(\u0003\rM#(/\u001b8h!\t!\u0006,D\u0001V\u0015\t1v+\u0001\u0004nKR\u0014\u0018n\u0019\u0006\u0003\u001fII!!W+\u0003\u0013M\u000bF*T3ue&\u001c\u0017\u0001\u00038v[B\u000b'\u000f^:\u0011\u0005qkV\"A\u001f\n\u0005yk$aA%oi\u00061A(\u001b8jiz\"R!Y2eK\u001a\u0004\"A\u0019\u0001\u000e\u00031AQaJ\u0003A\u0002!BQAM\u0003A\u0002MBQaR\u0003A\u0002!CQAW\u0003A\u0002m\u000bQb\u0014$G'\u0016#v\fT#O\u000fRCU#A.\u0002\u001d=3eiU#U?2+ej\u0012+IA\u0005)2M]3bi\u0016lU\r\u001e:jGN\u0014V\r]8si\u0016\u0014HC\u00017p!\tiR.\u0003\u0002o=\tY2\u000b[;gM2,wK]5uK6+GO]5dgJ+\u0007o\u001c:uKJDQ\u0001\u001d\u0005A\u0002E\fqaY8oi\u0016DH\u000f\u0005\u0002sg6\tA#\u0003\u0002u)\tYA+Y:l\u0007>tG/\u001a=u\u0003\u00159(/\u001b;f))9X0!\b\u0002<\u0005\u0015\u0013\u0011\n\t\u0003qnl\u0011!\u001f\u0006\u0003uR\t\u0011b]2iK\u0012,H.\u001a:\n\u0005qL(!C'baN#\u0018\r^;t\u0011\u0015q\u0018\u00021\u0001\u0000\u0003\u0019Ig\u000e];ugB\"\u0011\u0011AA\u0006!\u0015!\u00141AA\u0004\u0013\r\t)\u0001\u0011\u0002\t\u0013R,'/\u0019;peB!\u0011\u0011BA\u0006\u0019\u0001!1\"!\u0004~\u0003\u0003\u0005\tQ!\u0001\u0002\u0010\t\u0019q\fJ\u001b\u0012\t\u0005E\u0011q\u0003\t\u00049\u0006M\u0011bAA\u000b{\t9aj\u001c;iS:<\u0007c\u0001/\u0002\u001a%\u0019\u00111D\u001f\u0003\u0007\u0005s\u0017\u0010C\u0004\u0002 %\u0001\r!!\t\u0002\u0007\u0011,\u0007\u000f\r\u0005\u0002$\u0005-\u0012\u0011GA\u001c!%\u0011\u0018QEA\u0015\u0003_\t)$C\u0002\u0002(Q\u0011\u0011c\u00155vM\u001adW\rR3qK:$WM\\2z!\u0011\tI!a\u000b\u0005\u0019\u00055\u0012QDA\u0001\u0002\u0003\u0015\t!a\u0004\u0003\u0007}#c\u0007\u0005\u0003\u0002\n\u0005EB\u0001DA\u001a\u0003;\t\t\u0011!A\u0003\u0002\u0005=!aA0%oA!\u0011\u0011BA\u001c\t1\tI$!\b\u0002\u0002\u0003\u0005)\u0011AA\b\u0005\ryF\u0005\u000f\u0005\b\u0003{I\u0001\u0019AA \u0003\u0015i\u0017\r]%e!\ra\u0016\u0011I\u0005\u0004\u0003\u0007j$\u0001\u0002'p]\u001eDa!a\u0012\n\u0001\u0004Y\u0016\u0001C7ba&sG-\u001a=\t\u000bAL\u0001\u0019A9\u0002\u001b\u001d,GOT1uSZ,\u0007\u000b\\1o)\u0019\ty%a\u001d\u0002xA!\u0011\u0011KA7\u001d\u0011\t\u0019&a\u001a\u000f\t\u0005U\u0013\u0011\r\b\u0005\u0003/\nyF\u0004\u0003\u0002Z\u0005ucb\u0001\u001c\u0002\\%\t\u0011$\u0003\u0002\u00181%\u0011\u0011CF\u0005\u0005\u0003G\n)'A\u0003tKJ$WM\u0003\u0002\u0012-%!\u0011\u0011NA6\u0003Iy\u0005/\u001a:bi>\u0014x*\u001e;fe\u000ec\u0017m]:\u000b\t\u0005\r\u0014QM\u0005\u0005\u0003_\n\tH\u0001\u0005Pa\u0016\u0014\u0018\r^8s\u0015\u0011\tI'a\u001b\t\r\u0005U$\u00021\u0001Q\u0003!!\u0017\r^1GS2,\u0007BBA=\u0015\u0001\u0007\u0001+A\u0005j]\u0012,\u0007PR5mK\u0002")
public class CometShuffleWriteProcessor
extends ShuffleWriteProcessor
implements ShimCometShuffleWriteProcessor {
    private final Partitioning outputPartitioning;
    private final Seq<Attribute> outputAttributes;
    private final Map<String, SQLMetric> metrics;
    private final int numParts;
    private final int OFFSET_LENGTH;

    @Override
    public MapStatus write(RDD<?> rdd, ShuffleDependency<?, ?, ?> dep, long mapId, TaskContext context, Partition partition) {
        return ShimCometShuffleWriteProcessor.write$(this, rdd, dep, mapId, context, partition);
    }

    private int OFFSET_LENGTH() {
        return this.OFFSET_LENGTH;
    }

    public ShuffleWriteMetricsReporter createMetricsReporter(TaskContext context) {
        return new SQLShuffleWriteMetricsReporter((ShuffleWriteMetricsReporter)context.taskMetrics().shuffleWriteMetrics(), this.metrics);
    }

    @Override
    public MapStatus write(Iterator<?> inputs, ShuffleDependency<?, ?, ?> dep, long mapId, int mapIndex, TaskContext context) {
        ShuffleWriteMetricsReporter metricsReporter = this.createMetricsReporter(context);
        IndexShuffleBlockResolver shuffleBlockResolver = (IndexShuffleBlockResolver)SparkEnv$.MODULE$.get().shuffleManager().shuffleBlockResolver();
        File dataFile = shuffleBlockResolver.getDataFile(dep.shuffleId(), mapId);
        File indexFile = shuffleBlockResolver.getIndexFile(dep.shuffleId(), mapId, shuffleBlockResolver.getIndexFile$default$3());
        String tempDataFilename = dataFile.getPath().replace(".data", ".data.tmp");
        String tempIndexFilename = indexFile.getPath().replace(".index", ".index.tmp");
        Path tempDataFilePath = Paths.get(tempDataFilename, new String[0]);
        Path tempIndexFilePath = Paths.get(tempIndexFilename, new String[0]);
        OperatorOuterClass.Operator nativePlan = this.getNativePlan(tempDataFilename, tempIndexFilename);
        Seq detailedMetrics = (Seq)new .colon.colon((Object)"elapsed_compute", (List)new .colon.colon((Object)"encode_time", (List)new .colon.colon((Object)"repart_time", (List)new .colon.colon((Object)"mempool_time", (List)new .colon.colon((Object)"input_batches", (List)new .colon.colon((Object)"spill_count", (List)new .colon.colon((Object)"spilled_bytes", (List)Nil$.MODULE$)))))));
        Map nativeSQLMetrics = ((MapLike)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"output_rows"), this.metrics.apply((Object)SQLShuffleWriteMetricsReporter$.MODULE$.SHUFFLE_RECORDS_WRITTEN())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"data_size"), this.metrics.apply((Object)"dataSize")), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"write_time"), this.metrics.apply((Object)SQLShuffleWriteMetricsReporter$.MODULE$.SHUFFLE_WRITE_TIME()))}))).$plus$plus((GenTraversableOnce)this.metrics.filterKeys((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)detailedMetrics.contains(elem))));
        CometMetricNode nativeMetrics = CometMetricNode$.MODULE$.apply((Map<String, SQLMetric>)nativeSQLMetrics);
        Iterator newInputs = inputs.map((Function1 & Serializable & scala.Serializable)x$10 -> x$10._2());
        CometExecIterator cometIter = CometExec$.MODULE$.getCometIterator((Seq<Iterator<ColumnarBatch>>)((Seq)new .colon.colon((Object)newInputs, (List)Nil$.MODULE$)), this.outputAttributes.length(), nativePlan, nativeMetrics, this.numParts, context.partitionId());
        while (cometIter.hasNext()) {
            cometIter.next();
        }
        LongRef offset = LongRef.create((long)0L);
        long[] partitionLengths = (long[])new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(Files.readAllBytes(tempIndexFilePath))).grouped(this.OFFSET_LENGTH()).drop(1).map((Function1 & Serializable & scala.Serializable)indexBytes -> BoxesRunTime.boxToLong((long)CometShuffleWriteProcessor.$anonfun$write$3(offset, indexBytes))).toArray(ClassTag$.MODULE$.Long());
        metricsReporter.incBytesWritten(Files.size(tempDataFilePath));
        shuffleBlockResolver.writeMetadataFileAndCommit(dep.shuffleId(), mapId, partitionLengths, (long[])Array$.MODULE$.empty(ClassTag$.MODULE$.Long()), tempDataFilePath.toFile());
        return MapStatus$.MODULE$.apply(SparkEnv$.MODULE$.get().blockManager().shuffleServerId(), partitionLengths, mapId);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public OperatorOuterClass.Operator getNativePlan(String dataFile, String indexFile) {
        OperatorOuterClass.ShuffleWriter.Builder builder;
        OperatorOuterClass.ShuffleWriter.Builder builder2;
        OperatorOuterClass.Scan.Builder scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput");
        OperatorOuterClass.Operator.Builder opBuilder = OperatorOuterClass.Operator.newBuilder();
        Seq scanTypes = (Seq)this.outputAttributes.flatten((Function1 & Serializable & scala.Serializable)attr -> Option$.MODULE$.option2Iterable(QueryPlanSerde$.MODULE$.serializeDataType(attr.dataType())));
        if (scanTypes.length() != this.outputAttributes.length()) throw new UnsupportedOperationException(new StringBuilder(62).append(this.outputAttributes).append(" contains unsupported data types for CometShuffleExchangeExec.").toString());
        scanBuilder.addAllFields((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)scanTypes).asJava());
        OperatorOuterClass.ShuffleWriter.Builder shuffleWriterBuilder = OperatorOuterClass.ShuffleWriter.newBuilder();
        shuffleWriterBuilder.setOutputDataFile(dataFile);
        shuffleWriterBuilder.setOutputIndexFile(indexFile);
        shuffleWriterBuilder.setEnableFastEncoding(BoxesRunTime.unboxToBoolean((Object)CometConf$.MODULE$.COMET_SHUFFLE_ENABLE_FAST_ENCODING().get()));
        if (SparkEnv$.MODULE$.get().conf().getBoolean("spark.shuffle.compress", true)) {
            OperatorOuterClass.CompressionCodec compressionCodec;
            String string = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
            if ("zstd".equals(string)) {
                compressionCodec = OperatorOuterClass.CompressionCodec.Zstd;
            } else if ("lz4".equals(string)) {
                compressionCodec = OperatorOuterClass.CompressionCodec.Lz4;
            } else {
                if (!"snappy".equals(string)) throw new UnsupportedOperationException(new StringBuilder(15).append("invalid codec: ").append(string).toString());
                compressionCodec = OperatorOuterClass.CompressionCodec.Snappy;
            }
            OperatorOuterClass.CompressionCodec codec = compressionCodec;
            builder2 = shuffleWriterBuilder.setCodec(codec);
        } else {
            builder2 = shuffleWriterBuilder.setCodec(OperatorOuterClass.CompressionCodec.None);
        }
        shuffleWriterBuilder.setCompressionLevel(BoxesRunTime.unboxToInt((Object)CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL().get()));
        Partitioning partitioning = this.outputPartitioning;
        if (partitioning instanceof HashPartitioning) {
            HashPartitioning hashPartitioning = (HashPartitioning)this.outputPartitioning;
            PartitioningOuterClass.HashRepartition.Builder partitioning2 = PartitioningOuterClass.HashRepartition.newBuilder();
            partitioning2.setNumPartitions(this.outputPartitioning.numPartitions());
            Seq partitionExprs = (Seq)hashPartitioning.expressions().flatMap((Function1 & Serializable & scala.Serializable)e -> Option$.MODULE$.option2Iterable(QueryPlanSerde$.MODULE$.exprToProto((Expression)e, $this.outputAttributes, QueryPlanSerde$.MODULE$.exprToProto$default$3())), Seq$.MODULE$.canBuildFrom());
            if (partitionExprs.length() != hashPartitioning.expressions().length()) {
                throw new UnsupportedOperationException(new StringBuilder(31).append("Partitioning ").append(hashPartitioning).append(" is not supported.").toString());
            }
            partitioning2.addAllHashExpression((java.lang.Iterable)JavaConverters$.MODULE$.asJavaIterableConverter((Iterable)partitionExprs).asJava());
            PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
            builder = shuffleWriterBuilder.setPartitioning(partitioningBuilder.setHashPartition(partitioning2).build());
        } else {
            if (!SinglePartition$.MODULE$.equals(partitioning)) throw new UnsupportedOperationException(new StringBuilder(31).append("Partitioning ").append(this.outputPartitioning).append(" is not supported.").toString());
            PartitioningOuterClass.SinglePartition.Builder partitioning3 = PartitioningOuterClass.SinglePartition.newBuilder();
            PartitioningOuterClass.Partitioning.Builder partitioningBuilder = PartitioningOuterClass.Partitioning.newBuilder();
            builder = shuffleWriterBuilder.setPartitioning(partitioningBuilder.setSinglePartition(partitioning3).build());
        }
        OperatorOuterClass.Operator.Builder shuffleWriterOpBuilder = OperatorOuterClass.Operator.newBuilder();
        return shuffleWriterOpBuilder.setShuffleWriter(shuffleWriterBuilder).addChildren(opBuilder.setScan(scanBuilder).build()).build();
    }

    public static final /* synthetic */ long $anonfun$write$3(LongRef offset$1, byte[] indexBytes) {
        long partitionOffset = ByteBuffer.wrap(indexBytes).order(ByteOrder.LITTLE_ENDIAN).getLong();
        long partitionLength = partitionOffset - offset$1.elem;
        offset$1.elem = partitionOffset;
        return partitionLength;
    }

    public CometShuffleWriteProcessor(Partitioning outputPartitioning, Seq<Attribute> outputAttributes, Map<String, SQLMetric> metrics, int numParts) {
        this.outputPartitioning = outputPartitioning;
        this.outputAttributes = outputAttributes;
        this.metrics = metrics;
        this.numParts = numParts;
        ShimCometShuffleWriteProcessor.$init$(this);
        this.OFFSET_LENGTH = 8;
    }
}

