package za.co.absa.cobrix.spark.cobol.source.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import za.co.absa.cobrix.cobol.reader.common.Constants$;
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry;
import za.co.absa.cobrix.spark.cobol.reader.Reader;
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader;
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration;
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters;
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer;
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder;
import za.co.absa.cobrix.spark.cobol.utils.HDFSUtils$;
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils$;

/* compiled from: IndexBuilder.scala */
/* loaded from: input_file:za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder$.class */
public final class IndexBuilder$ {
    public static IndexBuilder$ MODULE$;
    private final Logger logger;

    static {
        new IndexBuilder$();
    }

    private Logger logger() {
        return this.logger;
    }

    public RDD<SparseIndexEntry> buildIndex(FileWithOrder[] fileWithOrderArr, Reader reader, SQLContext sQLContext, LocalityParameters localityParameters) {
        RDD<SparseIndexEntry> rdd;
        if (reader instanceof VarLenReader) {
            VarLenReader varLenReader = (VarLenReader) reader;
            rdd = (varLenReader.isIndexGenerationNeeded() && localityParameters.improveLocality()) ? buildIndexForVarLenReaderWithFullLocality(fileWithOrderArr, varLenReader, sQLContext, localityParameters.optimizeAllocation()) : buildIndexForVarLenReader(fileWithOrderArr, varLenReader, sQLContext);
        } else {
            rdd = null;
        }
        return rdd;
    }

    private RDD<SparseIndexEntry> buildIndexForVarLenReaderWithFullLocality(FileWithOrder[] fileWithOrderArr, VarLenReader varLenReader, SQLContext sQLContext, boolean z) {
        Configuration hadoopConfiguration = sQLContext.sparkContext().hadoopConfiguration();
        RDD<FileWithOrder> rDDWithLocality = toRDDWithLocality(fileWithOrderArr, hadoopConfiguration, sQLContext);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(hadoopConfiguration);
        RDD mapPartitions = rDDWithLocality.mapPartitions(iterator -> {
            FileSystem fileSystem = FileSystem.get(serializableConfiguration.value());
            return iterator.flatMap(fileWithOrder -> {
                String filePath = fileWithOrder.filePath();
                int order = fileWithOrder.order();
                MODULE$.logger().info(new StringBuilder(38).append("Going to generate index for the file: ").append(filePath).toString());
                return (ArrayBuffer) varLenReader.generateIndex(new FileStreamer(filePath, fileSystem, 0L, 0L), order, varLenReader.isRdwBigEndian()).map(sparseIndexEntry -> {
                    return new Tuple2(sparseIndexEntry, HDFSUtils$.MODULE$.getBlocksLocations(new Path(filePath), sparseIndexEntry.offsetFrom() >= 0 ? sparseIndexEntry.offsetFrom() : 0L, MODULE$.getBlockLengthByIndexEntry(sparseIndexEntry), fileSystem));
                }, ArrayBuffer$.MODULE$.canBuildFrom());
            });
        }, rDDWithLocality.mapPartitions$default$2(), ClassTag$.MODULE$.apply(Tuple2.class));
        logger().info("Going to collect located indexes into driver.");
        Seq<Tuple2<SparseIndexEntry, Seq<String>>> optimizeDistribution = z ? optimizeDistribution(Predef$.MODULE$.wrapRefArray((Object[]) mapPartitions.collect()), sQLContext.sparkContext()) : Predef$.MODULE$.wrapRefArray((Object[]) mapPartitions.collect());
        logger().info(new StringBuilder(34).append("Creating RDD for ").append(optimizeDistribution.length()).append(" located indexes.").toString());
        if (logger().isDebugEnabled()) {
            logger().debug("Preferred locations per index entry");
            optimizeDistribution.foreach(tuple2 -> {
                $anonfun$buildIndexForVarLenReaderWithFullLocality$4(tuple2);
                return BoxedUnit.UNIT;
            });
        }
        return sQLContext.sparkContext().makeRDD(optimizeDistribution, ClassTag$.MODULE$.apply(SparseIndexEntry.class));
    }

    private long getBlockLengthByIndexEntry(SparseIndexEntry sparseIndexEntry) {
        long offsetTo = sparseIndexEntry.offsetTo() > 0 ? sparseIndexEntry.offsetTo() : Long.MAX_VALUE;
        return offsetTo < 10 * ((long) Constants$.MODULE$.megabyte()) ? offsetTo : offsetTo - Constants$.MODULE$.megabyte();
    }

    private Seq<Tuple2<SparseIndexEntry, Seq<String>>> optimizeDistribution(Seq<Tuple2<SparseIndexEntry, Seq<String>>> seq, SparkContext sparkContext) {
        Seq<String> currentActiveExecutors = SparkUtils$.MODULE$.currentActiveExecutors(sparkContext);
        logger().info(new StringBuilder(62).append("Trying to balance ").append(seq.size()).append(" partitions among all available executors (").append(currentActiveExecutors).append(")").toString());
        return LocationBalancer$.MODULE$.balance(seq, currentActiveExecutors);
    }

    private RDD<FileWithOrder> toRDDWithLocality(FileWithOrder[] fileWithOrderArr, Configuration configuration, SQLContext sQLContext) {
        FileSystem fileSystem = FileSystem.get(configuration);
        Seq seq = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(fileWithOrderArr)).map(fileWithOrder -> {
            return new Tuple2(fileWithOrder, HDFSUtils$.MODULE$.getBlocksLocations(new Path(fileWithOrder.filePath()), fileSystem));
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toSeq();
        seq.foreach(tuple2 -> {
            $anonfun$toRDDWithLocality$2(tuple2);
            return BoxedUnit.UNIT;
        });
        return sQLContext.sparkContext().makeRDD(seq, ClassTag$.MODULE$.apply(FileWithOrder.class));
    }

    public RDD<SparseIndexEntry> buildIndexForVarLenReader(FileWithOrder[] fileWithOrderArr, VarLenReader varLenReader, SQLContext sQLContext) {
        RDD parallelize = sQLContext.sparkContext().parallelize(Predef$.MODULE$.wrapRefArray(fileWithOrderArr), fileWithOrderArr.length, ClassTag$.MODULE$.apply(FileWithOrder.class));
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sQLContext.sparkContext().hadoopConfiguration());
        RDD cache = parallelize.mapPartitions(iterator -> {
            FileSystem fileSystem = FileSystem.get(serializableConfiguration.value());
            return iterator.flatMap(fileWithOrder -> {
                String filePath = fileWithOrder.filePath();
                int order = fileWithOrder.order();
                MODULE$.logger().info(new StringBuilder(38).append("Going to generate index for the file: ").append(filePath).toString());
                return varLenReader.generateIndex(new FileStreamer(filePath, fileSystem, 0L, 0L), order, varLenReader.isRdwBigEndian());
            });
        }, parallelize.mapPartitions$default$2(), ClassTag$.MODULE$.apply(SparseIndexEntry.class)).cache();
        long count = cache.count();
        int min = (int) Math.min(count, Constants$.MODULE$.maxNumPartitions());
        logger().warn(new StringBuilder(47).append("Index elements count: ").append(count).append(", number of partitions = ").append(min).toString());
        return cache.repartition(min, cache.repartition$default$2(min)).cache();
    }

    public static final /* synthetic */ void $anonfun$buildIndexForVarLenReaderWithFullLocality$4(Tuple2 tuple2) {
        MODULE$.logger().debug(tuple2.toString());
    }

    public static final /* synthetic */ void $anonfun$toRDDWithLocality$2(Tuple2 tuple2) {
        MODULE$.logger().debug(tuple2.toString());
    }

    private IndexBuilder$() {
        MODULE$ = this;
        this.logger = LoggerFactory.getLogger(getClass());
    }
}
