package org.datacleaner.spark;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.metamodel.csv.CsvConfiguration;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.connection.CsvDatastore;
import org.datacleaner.connection.Datastore;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.ComponentJob;
import org.datacleaner.job.runner.AnalysisResultFuture;
import org.datacleaner.job.runner.AnalysisRunner;
import org.datacleaner.spark.functions.AnalyzerResultReduceFunction;
import org.datacleaner.spark.functions.CsvParserFunction;
import org.datacleaner.spark.functions.ExtractAnalyzerResultFunction;
import org.datacleaner.spark.functions.RowProcessingFunction;
import org.datacleaner.spark.functions.ValuesToInputRowFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/datacleaner/spark/SparkAnalysisRunner.class */
public class SparkAnalysisRunner implements AnalysisRunner {
    private static final Logger logger;
    private final SparkJobContext _sparkJobContext;
    private final JavaSparkContext _sparkContext;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SparkAnalysisRunner(JavaSparkContext javaSparkContext, SparkJobContext sparkJobContext) {
        this._sparkContext = javaSparkContext;
        this._sparkJobContext = sparkJobContext;
    }

    public void run() {
        run(this._sparkJobContext.getAnalysisJob());
    }

    public AnalysisResultFuture run(AnalysisJob analysisJob) {
        JavaPairRDD mapPartitionsToPair;
        if (!$assertionsDisabled && analysisJob != this._sparkJobContext.getAnalysisJob()) {
            throw new AssertionError();
        }
        JavaRDD<InputRow> openSourceDatastore = openSourceDatastore(this._sparkJobContext.getAnalysisJob().getDatastore());
        if (isDistributable(analysisJob)) {
            logger.info("Running the job in distributed mode");
            mapPartitionsToPair = openSourceDatastore.mapPartitionsToPair(new RowProcessingFunction(this._sparkJobContext)).reduceByKey(new AnalyzerResultReduceFunction(this._sparkJobContext));
        } else {
            logger.warn("Running the job in non-distributed mode");
            mapPartitionsToPair = openSourceDatastore.coalesce(1).mapPartitionsToPair(new RowProcessingFunction(this._sparkJobContext));
        }
        List<Tuple2> collect = mapPartitionsToPair.mapValues(new ExtractAnalyzerResultFunction()).collect();
        logger.info("Finished! Number of AnalyzerResult objects: {}", Integer.valueOf(collect.size()));
        for (Tuple2 tuple2 : collect) {
            logger.info("AnalyzerResult: " + ((String) tuple2._1) + "->" + ((AnalyzerResult) tuple2._2));
        }
        for (Map.Entry<String, Accumulator<Integer>> entry : this._sparkJobContext.getAccumulators().entrySet()) {
            logger.info("Accumulator: {} -> {}", entry.getKey(), entry.getValue().value());
        }
        return new SparkAnalysisResultFuture(collect);
    }

    private boolean isDistributable(AnalysisJob analysisJob) {
        Iterator<ComponentJob> it = this._sparkJobContext.getComponentList().iterator();
        while (it.hasNext()) {
            if (!it.next().getDescriptor().isDistributable()) {
                return false;
            }
        }
        return true;
    }

    private JavaRDD<InputRow> openSourceDatastore(Datastore datastore) {
        if (!(datastore instanceof CsvDatastore)) {
            throw new UnsupportedOperationException("Unsupported datastore type or configuration: " + datastore);
        }
        CsvDatastore csvDatastore = (CsvDatastore) datastore;
        String qualifiedPath = csvDatastore.getResource().getQualifiedPath();
        CsvConfiguration csvConfiguration = csvDatastore.getCsvConfiguration();
        JavaPairRDD zipWithIndex = this._sparkContext.textFile(qualifiedPath).map(new CsvParserFunction(csvConfiguration)).zipWithIndex();
        if (csvConfiguration.getColumnNameLineNumber() != 0) {
            zipWithIndex = zipWithIndex.filter(new SkipHeaderLineFunction(csvConfiguration.getColumnNameLineNumber()));
        }
        return zipWithIndex.map(new ValuesToInputRowFunction(this._sparkJobContext));
    }

    static {
        $assertionsDisabled = !SparkAnalysisRunner.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SparkAnalysisRunner.class);
    }
}
