package io.qbeast.spark.delta.writer;

import io.qbeast.core.model.CubeId;
import io.qbeast.core.model.DataWriter;
import io.qbeast.core.model.IndexStatus;
import io.qbeast.core.model.QTableID;
import io.qbeast.core.model.QbeastBlock;
import io.qbeast.core.model.TableChanges;
import io.qbeast.spark.index.QbeastColumns;
import io.qbeast.spark.index.QbeastColumns$;
import java.net.URI;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.delta.DeltaStatsCollectionUtils;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.FileAction;
import org.apache.spark.sql.delta.stats.DeltaFileStatistics;
import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker;
import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats;
import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
import org.apache.spark.sql.execution.datasources.WriteJobStatsTracker;
import org.apache.spark.sql.execution.datasources.WriteTaskStats;
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Builder;
import scala.collection.parallel.immutable.ParVector;
import scala.collection.parallel.immutable.ParVector$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/* compiled from: SparkDeltaDataWriter.scala */
/* loaded from: input_file:io/qbeast/spark/delta/writer/SparkDeltaDataWriter$.class */
public final class SparkDeltaDataWriter$ implements DataWriter<Dataset<Row>, StructType, FileAction>, DeltaStatsCollectionUtils {
    public static SparkDeltaDataWriter$ MODULE$;

    static {
        new SparkDeltaDataWriter$();
    }

    @Override // org.apache.spark.sql.delta.DeltaStatsCollectionUtils
    public Option<DeltaJobStatisticsTracker> getDeltaOptionalTrackers(Dataset<Row> dataset, SparkSession sparkSession, QTableID qTableID) {
        Option<DeltaJobStatisticsTracker> deltaOptionalTrackers;
        deltaOptionalTrackers = getDeltaOptionalTrackers(dataset, sparkSession, qTableID);
        return deltaOptionalTrackers;
    }

    public Seq<FileAction> write(QTableID qTableID, StructType structType, Dataset<Row> dataset, TableChanges tableChanges) {
        SparkSession sparkSession = dataset.sparkSession();
        Job job = Job.getInstance(sparkSession.sparkContext().hadoopConfiguration());
        OutputWriterFactory prepareWrite = new ParquetFileFormat().prepareWrite(sparkSession, job, Predef$.MODULE$.Map().empty(), structType);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        scala.collection.Seq<WriteJobStatsTracker> statsTrackers = StatsTracker$.MODULE$.getStatsTrackers();
        QbeastColumns apply = QbeastColumns$.MODULE$.apply(dataset);
        Option<DeltaJobStatisticsTracker> deltaOptionalTrackers = getDeltaOptionalTrackers(dataset.selectExpr((scala.collection.Seq) ((TraversableLike) dataset.schema().map(structField -> {
            return structField.name();
        }, Seq$.MODULE$.canBuildFrom())).filterNot(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$2(str));
        })), sparkSession, qTableID);
        BlockWriter blockWriter = new BlockWriter(qTableID.id(), structType, dataset.schema(), prepareWrite, serializableConfiguration, (scala.collection.Seq) statsTrackers.$plus$plus(Option$.MODULE$.option2Iterable(deltaOptionalTrackers), Seq$.MODULE$.canBuildFrom()), apply, tableChanges);
        Predef$ predef$ = Predef$.MODULE$;
        RDD execute = dataset.repartition(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(QbeastColumns$.MODULE$.cubeColumnName())})).queryExecution().executedPlan().execute();
        IndexedSeq indexedSeq = new ArrayOps.ofRef(predef$.refArrayOps((Object[]) execute.mapPartitions(iterator -> {
            return blockWriter.writeRow(iterator);
        }, execute.mapPartitions$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect())).toIndexedSeq();
        IndexedSeq indexedSeq2 = (IndexedSeq) indexedSeq.map(tuple2 -> {
            return (AddFile) tuple2._1();
        }, IndexedSeq$.MODULE$.canBuildFrom());
        IndexedSeq indexedSeq3 = (IndexedSeq) indexedSeq.map(tuple22 -> {
            return (TaskStats) tuple22._2();
        }, IndexedSeq$.MODULE$.canBuildFrom());
        ObjectRef create = ObjectRef.create(Nil$.MODULE$);
        ObjectRef create2 = ObjectRef.create(Nil$.MODULE$);
        LongRef create3 = LongRef.create(0L);
        indexedSeq3.foreach(taskStats -> {
            $anonfun$write$6(create, create2, create3, taskStats);
            return BoxedUnit.UNIT;
        });
        statsTrackers.foreach(writeJobStatsTracker -> {
            $anonfun$write$9(create2, create3, writeJobStatsTracker);
            return BoxedUnit.UNIT;
        });
        deltaOptionalTrackers.foreach(deltaJobStatisticsTracker -> {
            $anonfun$write$10(create, create3, deltaJobStatisticsTracker);
            return BoxedUnit.UNIT;
        });
        return (IndexedSeq) indexedSeq2.map(addFile -> {
            return addFile.copy(addFile.copy$default$1(), addFile.copy$default$2(), addFile.copy$default$3(), addFile.copy$default$4(), addFile.copy$default$5(), (String) deltaOptionalTrackers.map(deltaJobStatisticsTracker2 -> {
                return (String) deltaJobStatisticsTracker2.recordedStats().apply(new Path(new URI(addFile.path())).toString());
            }).getOrElse(() -> {
                return addFile.stats();
            }), addFile.copy$default$7(), addFile.copy$default$8());
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    public Seq<Tuple2<CubeId, Seq<QbeastBlock>>> groupFilesToCompact(Seq<Tuple2<CubeId, Seq<QbeastBlock>>> seq) {
        return (Seq) ((Seq) ((TraversableLike) seq.map(tuple2 -> {
            if (tuple2 != null) {
                return new Tuple2((CubeId) tuple2._1(), ((Seq) tuple2._2()).filter(qbeastBlock -> {
                    return BoxesRunTime.boxToBoolean($anonfun$groupFilesToCompact$2(qbeastBlock));
                }));
            }
            throw new MatchError(tuple2);
        }, scala.collection.immutable.Seq$.MODULE$.canBuildFrom())).filter(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$groupFilesToCompact$3(tuple22));
        })).flatMap(tuple23 -> {
            if (tuple23 == null) {
                throw new MatchError(tuple23);
            }
            CubeId cubeId = (CubeId) tuple23._1();
            Seq seq2 = (Seq) tuple23._2();
            Builder newBuilder = Seq$.MODULE$.newBuilder();
            Builder newBuilder2 = Seq$.MODULE$.newBuilder();
            LongRef create = LongRef.create(0L);
            seq2.foreach(qbeastBlock -> {
                $anonfun$groupFilesToCompact$5(create, newBuilder, newBuilder2, qbeastBlock);
                return BoxedUnit.UNIT;
            });
            newBuilder.$plus$eq(newBuilder2.result());
            return (scala.collection.Seq) ((TraversableLike) newBuilder.result()).map(seq3 -> {
                return new Tuple2(cubeId, seq3.toIndexedSeq());
            }, Seq$.MODULE$.canBuildFrom());
        }, scala.collection.immutable.Seq$.MODULE$.canBuildFrom());
    }

    public Seq<FileAction> compact(QTableID qTableID, StructType structType, IndexStatus indexStatus, TableChanges tableChanges) {
        SparkSession active = SparkSession$.MODULE$.active();
        ParVector parVector = new ParVector(groupFilesToCompact(indexStatus.cubesStatuses().mapValues(cubeStatus -> {
            return cubeStatus.files();
        }).toIndexedSeq()).toVector());
        Job job = Job.getInstance();
        OutputWriterFactory prepareWrite = new ParquetFileFormat().prepareWrite(active, job, Predef$.MODULE$.Map().empty(), structType);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        return ((ParVector) parVector.flatMap(tuple2 -> {
            scala.collection.Seq indexedSeq;
            if (tuple2 != null) {
                CubeId cubeId = (CubeId) tuple2._1();
                Seq seq = (Seq) tuple2._2();
                if (cubeId != null) {
                    if (seq.size() <= 1) {
                        indexedSeq = (scala.collection.Seq) Nil$.MODULE$;
                    } else {
                        Seq seq2 = (Seq) seq.map(qbeastBlock -> {
                            return new Path(qTableID.id(), qbeastBlock.path()).toString();
                        }, scala.collection.immutable.Seq$.MODULE$.canBuildFrom());
                        Compactor compactor = new Compactor(qTableID, prepareWrite, serializableConfiguration, structType, cubeId, seq.toIndexedSeq(), tableChanges);
                        Predef$ predef$ = Predef$.MODULE$;
                        RDD execute = active.read().format("parquet").load(seq2).repartition(1).queryExecution().executedPlan().execute();
                        indexedSeq = new ArrayOps.ofRef(predef$.refArrayOps((Object[]) execute.mapPartitions(iterator -> {
                            return compactor.writeBlock(iterator);
                        }, execute.mapPartitions$default$2(), ClassTag$.MODULE$.apply(FileAction.class)).collect())).toIndexedSeq();
                    }
                    return indexedSeq;
                }
            }
            throw new MatchError(tuple2);
        }, ParVector$.MODULE$.canBuildFrom())).seq();
    }

    public static final /* synthetic */ boolean $anonfun$write$2(String str) {
        return QbeastColumns$.MODULE$.contains(str);
    }

    public static final /* synthetic */ boolean $anonfun$write$7(WriteTaskStats writeTaskStats) {
        return writeTaskStats instanceof DeltaFileStatistics;
    }

    public static final /* synthetic */ boolean $anonfun$write$8(WriteTaskStats writeTaskStats) {
        return writeTaskStats instanceof BasicWriteTaskStats;
    }

    public static final /* synthetic */ void $anonfun$write$6(ObjectRef objectRef, ObjectRef objectRef2, LongRef longRef, TaskStats taskStats) {
        objectRef.elem = (scala.collection.Seq) ((scala.collection.Seq) objectRef.elem).$plus$plus((GenTraversableOnce) taskStats.writeTaskStats().filter(writeTaskStats -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$7(writeTaskStats));
        }), Seq$.MODULE$.canBuildFrom());
        objectRef2.elem = (scala.collection.Seq) ((scala.collection.Seq) objectRef2.elem).$plus$plus((GenTraversableOnce) taskStats.writeTaskStats().filter(writeTaskStats2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$8(writeTaskStats2));
        }), Seq$.MODULE$.canBuildFrom());
        longRef.elem = package$.MODULE$.max(longRef.elem, taskStats.endTime());
    }

    public static final /* synthetic */ void $anonfun$write$9(ObjectRef objectRef, LongRef longRef, WriteJobStatsTracker writeJobStatsTracker) {
        writeJobStatsTracker.processStats((scala.collection.Seq) objectRef.elem, longRef.elem);
    }

    public static final /* synthetic */ void $anonfun$write$10(ObjectRef objectRef, LongRef longRef, DeltaJobStatisticsTracker deltaJobStatisticsTracker) {
        deltaJobStatisticsTracker.processStats((scala.collection.Seq) objectRef.elem, longRef.elem);
    }

    public static final /* synthetic */ boolean $anonfun$groupFilesToCompact$2(QbeastBlock qbeastBlock) {
        return qbeastBlock.size() >= ((long) org.apache.spark.qbeast.config.package$.MODULE$.MIN_COMPACTION_FILE_SIZE_IN_BYTES());
    }

    public static final /* synthetic */ boolean $anonfun$groupFilesToCompact$3(Tuple2 tuple2) {
        return ((TraversableOnce) tuple2._2()).nonEmpty();
    }

    public static final /* synthetic */ void $anonfun$groupFilesToCompact$5(LongRef longRef, Builder builder, Builder builder2, QbeastBlock qbeastBlock) {
        if (qbeastBlock.size() + longRef.elem > org.apache.spark.qbeast.config.package$.MODULE$.MAX_COMPACTION_FILE_SIZE_IN_BYTES()) {
            builder.$plus$eq(builder2.result());
            builder2.clear();
            longRef.elem = 0L;
        }
        builder2.$plus$eq(qbeastBlock);
        longRef.elem += qbeastBlock.size();
    }

    private SparkDeltaDataWriter$() {
        MODULE$ = this;
        DeltaStatsCollectionUtils.$init$(this);
    }
}
