package org.datacleaner.spark;

import java.util.Collections;
import java.util.List;
import org.apache.metamodel.csv.CsvConfiguration;
import org.apache.metamodel.util.Resource;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.connection.CsvDatastore;
import org.datacleaner.connection.Datastore;
import org.datacleaner.connection.JsonDatastore;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.runner.AnalysisResultFuture;
import org.datacleaner.job.runner.AnalysisRunner;
import org.datacleaner.spark.functions.AnalyzerResultReduceFunction;
import org.datacleaner.spark.functions.CsvParserFunction;
import org.datacleaner.spark.functions.ExtractAnalyzerResultFunction;
import org.datacleaner.spark.functions.JsonParserFunction;
import org.datacleaner.spark.functions.RowProcessingFunction;
import org.datacleaner.spark.functions.TuplesToTuplesFunction;
import org.datacleaner.spark.functions.ValuesToInputRowFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/datacleaner/spark/SparkAnalysisRunner.class */
public class SparkAnalysisRunner implements AnalysisRunner {
    private static final Logger logger;
    private final SparkJobContext _sparkJobContext;
    private final JavaSparkContext _sparkContext;
    private final Integer _minPartitions;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SparkAnalysisRunner(JavaSparkContext javaSparkContext, SparkJobContext sparkJobContext) {
        this(javaSparkContext, sparkJobContext, null);
    }

    public SparkAnalysisRunner(JavaSparkContext javaSparkContext, SparkJobContext sparkJobContext, Integer num) {
        this._sparkContext = javaSparkContext;
        this._sparkJobContext = sparkJobContext;
        if (num == null) {
            this._minPartitions = null;
        } else if (num.intValue() > 0) {
            this._minPartitions = num;
        } else {
            logger.warn("Minimum number of partitions needs to be a positive number, but specified: {}. Disregarding the value and inferring the number of partitions automatically", num);
            this._minPartitions = null;
        }
    }

    public AnalysisResultFuture run(AnalysisJob analysisJob) {
        return run();
    }

    public AnalysisResultFuture run() {
        JavaPairRDD mapPartitionsToPair;
        this._sparkJobContext.triggerOnJobStart();
        JavaRDD<InputRow> openSourceDatastore = openSourceDatastore(this._sparkJobContext.getAnalysisJob().getDatastore());
        if (this._sparkJobContext.getAnalysisJobBuilder().isDistributable()) {
            logger.info("Running the job in distributed mode");
            JavaRDD mapPartitionsWithIndex = openSourceDatastore.mapPartitionsWithIndex(new RowProcessingFunction(this._sparkJobContext), true);
            if (this._sparkJobContext.isResultEnabled()) {
                mapPartitionsToPair = mapPartitionsWithIndex.mapPartitionsToPair(new TuplesToTuplesFunction(), true).reduceByKey(new AnalyzerResultReduceFunction(this._sparkJobContext));
            } else {
                mapPartitionsWithIndex.count();
                mapPartitionsToPair = null;
            }
        } else {
            logger.warn("Running the job in non-distributed mode");
            mapPartitionsToPair = openSourceDatastore.coalesce(1).mapPartitionsToPair(new RowProcessingFunction(this._sparkJobContext));
            if (!this._sparkJobContext.isResultEnabled()) {
                mapPartitionsToPair.count();
            }
        }
        if (!this._sparkJobContext.isResultEnabled()) {
            return new SparkAnalysisResultFuture(Collections.emptyList(), this._sparkJobContext);
        }
        List<Tuple2> collect = mapPartitionsToPair.mapValues(new ExtractAnalyzerResultFunction()).collect();
        logger.info("Finished! Number of AnalyzerResult objects: {}", Integer.valueOf(collect.size()));
        for (Tuple2 tuple2 : collect) {
            logger.info("AnalyzerResult (" + ((String) tuple2._1) + "):\n\n" + ((AnalyzerResult) tuple2._2) + "\n");
        }
        this._sparkJobContext.triggerOnJobEnd();
        return new SparkAnalysisResultFuture(collect, this._sparkJobContext);
    }

    private JavaRDD<InputRow> openSourceDatastore(Datastore datastore) {
        if (!(datastore instanceof CsvDatastore)) {
            if (!(datastore instanceof JsonDatastore)) {
                throw new UnsupportedOperationException("Unsupported datastore type or configuration: " + datastore);
            }
            JsonDatastore jsonDatastore = (JsonDatastore) datastore;
            String qualifiedPath = jsonDatastore.getResource().getQualifiedPath();
            return (this._minPartitions != null ? this._sparkContext.textFile(qualifiedPath, this._minPartitions.intValue()) : this._sparkContext.textFile(qualifiedPath)).map(new JsonParserFunction(jsonDatastore)).zipWithIndex().map(new ValuesToInputRowFunction(this._sparkJobContext));
        }
        CsvDatastore csvDatastore = (CsvDatastore) datastore;
        Resource resource = csvDatastore.getResource();
        if (!$assertionsDisabled && resource == null) {
            throw new AssertionError();
        }
        String qualifiedPath2 = resource.getQualifiedPath();
        CsvConfiguration csvConfiguration = csvDatastore.getCsvConfiguration();
        JavaPairRDD zipWithIndex = (this._minPartitions != null ? this._sparkContext.textFile(qualifiedPath2, this._minPartitions.intValue()) : this._sparkContext.textFile(qualifiedPath2)).map(new CsvParserFunction(csvConfiguration)).zipWithIndex();
        if (csvConfiguration.getColumnNameLineNumber() != 0) {
            zipWithIndex = zipWithIndex.filter(new SkipHeaderLineFunction(csvConfiguration.getColumnNameLineNumber()));
        }
        return zipWithIndex.map(new ValuesToInputRowFunction(this._sparkJobContext));
    }

    static {
        $assertionsDisabled = !SparkAnalysisRunner.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SparkAnalysisRunner.class);
    }
}
