package za.co.absa.cobrix.spark.cobol.source.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import za.co.absa.cobrix.cobol.internal.Logging;
import za.co.absa.cobrix.cobol.reader.common.Constants$;
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry;
import za.co.absa.cobrix.spark.cobol.reader.Reader;
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader;
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration;
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters;
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer;
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer$;
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder;
import za.co.absa.cobrix.spark.cobol.utils.HDFSUtils$;
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils$;

/* compiled from: IndexBuilder.scala */
/* loaded from: input_file:za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder$.class */
public final class IndexBuilder$ implements Logging {
    public static IndexBuilder$ MODULE$;
    private transient Logger za$co$absa$cobrix$cobol$internal$Logging$$log_;

    static {
        new IndexBuilder$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger logger() {
        return Logging.logger$(this);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public Logger za$co$absa$cobrix$cobol$internal$Logging$$log_() {
        return this.za$co$absa$cobrix$cobol$internal$Logging$$log_;
    }

    public void za$co$absa$cobrix$cobol$internal$Logging$$log__$eq(Logger logger) {
        this.za$co$absa$cobrix$cobol$internal$Logging$$log_ = logger;
    }

    public RDD<SparseIndexEntry> buildIndex(FileWithOrder[] fileWithOrderArr, Reader reader, SQLContext sQLContext, LocalityParameters localityParameters) {
        FileSystem fileSystem = new Path(((FileWithOrder) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(fileWithOrderArr)).head()).filePath()).getFileSystem(sQLContext.sparkSession().sparkContext().hadoopConfiguration());
        boolean z = false;
        VarLenReader varLenReader = null;
        if (reader instanceof VarLenReader) {
            z = true;
            varLenReader = (VarLenReader) reader;
            if (varLenReader.isIndexGenerationNeeded() && localityParameters.improveLocality() && isDataLocalitySupported(fileSystem)) {
                return buildIndexForVarLenReaderWithFullLocality(fileWithOrderArr, varLenReader, sQLContext, localityParameters.optimizeAllocation());
            }
        }
        return z ? buildIndexForVarLenReader(fileWithOrderArr, varLenReader, sQLContext) : buildIndexForFullFiles(fileWithOrderArr, sQLContext);
    }

    public RDD<SparseIndexEntry> buildIndexForVarLenReaderWithFullLocality(FileWithOrder[] fileWithOrderArr, VarLenReader varLenReader, SQLContext sQLContext, boolean z) {
        Configuration hadoopConfiguration = sQLContext.sparkContext().hadoopConfiguration();
        RDD<FileWithOrder> rDDWithLocality = toRDDWithLocality(fileWithOrderArr, hadoopConfiguration, sQLContext);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(hadoopConfiguration);
        RDD mapPartitions = rDDWithLocality.mapPartitions(iterator -> {
            return iterator.flatMap(fileWithOrder -> {
                ArrayBuffer<SparseIndexEntry> generateIndexEntry = MODULE$.generateIndexEntry(fileWithOrder, serializableConfiguration.value(), varLenReader);
                String filePath = fileWithOrder.filePath();
                FileSystem fileSystem = new Path(filePath).getFileSystem(serializableConfiguration.value());
                return (ArrayBuffer) generateIndexEntry.map(sparseIndexEntry -> {
                    return new Tuple2(sparseIndexEntry, HDFSUtils$.MODULE$.getBlocksLocations(new Path(filePath), sparseIndexEntry.offsetFrom() >= 0 ? sparseIndexEntry.offsetFrom() : 0L, MODULE$.getBlockLengthByIndexEntry(sparseIndexEntry), fileSystem));
                }, ArrayBuffer$.MODULE$.canBuildFrom());
            });
        }, rDDWithLocality.mapPartitions$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
        logger().info("Going to collect located indexes into driver.");
        Seq<Tuple2<SparseIndexEntry, Seq<String>>> optimizeDistribution = z ? optimizeDistribution(Predef$.MODULE$.wrapRefArray((Object[]) mapPartitions.collect()), sQLContext.sparkContext()) : Predef$.MODULE$.wrapRefArray((Object[]) mapPartitions.collect());
        logger().info(new StringBuilder(34).append("Creating RDD for ").append(optimizeDistribution.length()).append(" located indexes.").toString());
        logDebug(() -> {
            return "Preferred locations per index entry";
        });
        optimizeDistribution.foreach(tuple2 -> {
            $anonfun$buildIndexForVarLenReaderWithFullLocality$5(tuple2);
            return BoxedUnit.UNIT;
        });
        return repartitionIndexes(sQLContext.sparkContext().makeRDD(optimizeDistribution, ClassTag$.MODULE$.apply(SparseIndexEntry.class)));
    }

    public RDD<SparseIndexEntry> buildIndexForVarLenReader(FileWithOrder[] fileWithOrderArr, VarLenReader varLenReader, SQLContext sQLContext) {
        RDD parallelize = sQLContext.sparkContext().parallelize(Predef$.MODULE$.wrapRefArray(fileWithOrderArr), fileWithOrderArr.length, ClassTag$.MODULE$.apply(FileWithOrder.class));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sQLContext.sparkContext().hadoopConfiguration());
        return repartitionIndexes(parallelize.mapPartitions(iterator -> {
            return iterator.flatMap(fileWithOrder -> {
                return MODULE$.generateIndexEntry(fileWithOrder, serializableConfiguration.value(), varLenReader);
            });
        }, parallelize.mapPartitions$default$2(), ClassTag$.MODULE$.apply(SparseIndexEntry.class)).cache());
    }

    public RDD<SparseIndexEntry> buildIndexForFullFiles(FileWithOrder[] fileWithOrderArr, SQLContext sQLContext) {
        RDD parallelize = sQLContext.sparkContext().parallelize(Predef$.MODULE$.wrapRefArray(fileWithOrderArr), fileWithOrderArr.length, ClassTag$.MODULE$.apply(FileWithOrder.class));
        return repartitionIndexes(parallelize.mapPartitions(iterator -> {
            return iterator.flatMap(fileWithOrder -> {
                return ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new SparseIndexEntry[]{new SparseIndexEntry(0L, -1L, fileWithOrder.order(), 0L)}));
            });
        }, parallelize.mapPartitions$default$2(), ClassTag$.MODULE$.apply(SparseIndexEntry.class)).cache());
    }

    public ArrayBuffer<SparseIndexEntry> generateIndexEntry(FileWithOrder fileWithOrder, Configuration configuration, VarLenReader varLenReader) {
        long j;
        String filePath = fileWithOrder.filePath();
        Path path = new Path(filePath);
        int order = fileWithOrder.order();
        FileSystem fileSystem = path.getFileSystem(configuration);
        int fileStartOffset = varLenReader.getReaderProperties().fileStartOffset();
        if (varLenReader.getReaderProperties().fileEndOffset() == 0) {
            j = 0;
        } else {
            long length = (fileSystem.getContentSummary(path).getLength() - varLenReader.getReaderProperties().fileEndOffset()) - fileStartOffset;
            j = length < 0 ? 0L : length;
        }
        long j2 = j;
        logger().info(new StringBuilder(38).append("Going to generate index for the file: ").append(filePath).toString());
        ArrayBuffer<SparseIndexEntry> generateIndex = varLenReader.generateIndex(new FileStreamer(filePath, fileSystem, fileStartOffset, j2), new FileStreamer(filePath, fileSystem, FileStreamer$.MODULE$.$lessinit$greater$default$3(), FileStreamer$.MODULE$.$lessinit$greater$default$4()), order, varLenReader.isRdwBigEndian());
        return j2 > 0 ? (ArrayBuffer) generateIndex.map(sparseIndexEntry -> {
            if (sparseIndexEntry.offsetTo() != -1) {
                return sparseIndexEntry;
            }
            return sparseIndexEntry.copy(sparseIndexEntry.copy$default$1(), fileStartOffset + j2, sparseIndexEntry.copy$default$3(), sparseIndexEntry.copy$default$4());
        }, ArrayBuffer$.MODULE$.canBuildFrom()) : generateIndex;
    }

    public long getBlockLengthByIndexEntry(SparseIndexEntry sparseIndexEntry) {
        long offsetTo = sparseIndexEntry.offsetTo() - sparseIndexEntry.offsetFrom() > 0 ? sparseIndexEntry.offsetTo() - sparseIndexEntry.offsetFrom() : Constants$.MODULE$.megabyte();
        return offsetTo < 10 * ((long) Constants$.MODULE$.megabyte()) ? offsetTo : offsetTo - Constants$.MODULE$.megabyte();
    }

    public Seq<Tuple2<SparseIndexEntry, Seq<String>>> optimizeDistribution(Seq<Tuple2<SparseIndexEntry, Seq<String>>> seq, SparkContext sparkContext) {
        Seq<String> currentActiveExecutors = SparkUtils$.MODULE$.currentActiveExecutors(sparkContext);
        logger().info(new StringBuilder(62).append("Trying to balance ").append(seq.size()).append(" partitions among all available executors (").append(currentActiveExecutors).append(")").toString());
        return LocationBalancer$.MODULE$.balance(seq, currentActiveExecutors);
    }

    public RDD<FileWithOrder> toRDDWithLocality(FileWithOrder[] fileWithOrderArr, Configuration configuration, SQLContext sQLContext) {
        FileSystem fileSystem = FileSystem.get(configuration);
        Seq seq = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(fileWithOrderArr)).map(fileWithOrder -> {
            return new Tuple2(fileWithOrder, HDFSUtils$.MODULE$.getBlocksLocations(new Path(fileWithOrder.filePath()), fileSystem));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toSeq();
        seq.foreach(tuple2 -> {
            $anonfun$toRDDWithLocality$2(tuple2);
            return BoxedUnit.UNIT;
        });
        return sQLContext.sparkContext().makeRDD(seq, ClassTag$.MODULE$.apply(FileWithOrder.class));
    }

    public boolean isDataLocalitySupported(FileSystem fileSystem) {
        return fileSystem instanceof DistributedFileSystem;
    }

    private RDD<SparseIndexEntry> repartitionIndexes(RDD<SparseIndexEntry> rdd) {
        long count = rdd.count();
        int min = (int) Math.min(count, Constants$.MODULE$.maxNumPartitions());
        logger().info(new StringBuilder(47).append("Index elements count: ").append(count).append(", number of partitions = ").append(min).toString());
        return rdd.repartition(min, rdd.repartition$default$2(min)).cache();
    }

    public static final /* synthetic */ void $anonfun$buildIndexForVarLenReaderWithFullLocality$5(Tuple2 tuple2) {
        MODULE$.logDebug(() -> {
            return tuple2.toString();
        });
    }

    public static final /* synthetic */ void $anonfun$toRDDWithLocality$2(Tuple2 tuple2) {
        MODULE$.logDebug(() -> {
            return tuple2.toString();
        });
    }

    private IndexBuilder$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}
