package org.datacleaner.spark.functions;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.apache.metamodel.util.FileResource;
import org.apache.metamodel.util.HdfsResource;
import org.apache.metamodel.util.Resource;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.AnalyzerResultFuture;
import org.datacleaner.api.HasAnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.configuration.DataCleanerConfiguration;
import org.datacleaner.connection.CsvDatastore;
import org.datacleaner.connection.JsonDatastore;
import org.datacleaner.connection.ResourceDatastore;
import org.datacleaner.connection.UpdateableDatastore;
import org.datacleaner.descriptors.ConfiguredPropertyDescriptor;
import org.datacleaner.extension.output.CreateCsvFileAnalyzer;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.builder.AnalysisJobBuilder;
import org.datacleaner.job.builder.ComponentBuilder;
import org.datacleaner.job.runner.ActiveOutputDataStream;
import org.datacleaner.job.runner.ConsumeRowHandler;
import org.datacleaner.job.runner.RowProcessingConsumer;
import org.datacleaner.lifecycle.LifeCycleHelper;
import org.datacleaner.spark.NamedAnalyzerResult;
import org.datacleaner.spark.SparkJobContext;
import org.datacleaner.spark.utils.HdfsHelper;
import org.datacleaner.util.HadoopResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/datacleaner/spark/functions/RowProcessingFunction.class */
public final class RowProcessingFunction implements Function2<Integer, Iterator<InputRow>, Iterator<Tuple2<String, NamedAnalyzerResult>>>, PairFlatMapFunction<Iterator<InputRow>, String, NamedAnalyzerResult> {
    private static final Logger logger = LoggerFactory.getLogger(RowProcessingFunction.class);
    private static final long serialVersionUID = 1;
    private final SparkJobContext _sparkJobContext;

    public RowProcessingFunction(SparkJobContext sparkJobContext) {
        this._sparkJobContext = sparkJobContext;
    }

    public Iterable<Tuple2<String, NamedAnalyzerResult>> call(Iterator<InputRow> it) throws Exception {
        logger.info("call(Iterator) invoked");
        List<Tuple2<String, NamedAnalyzerResult>> executePartition = executePartition(it, this._sparkJobContext.getAnalysisJob());
        logger.info("call(Iterator) finished, returning {} results", Integer.valueOf(executePartition.size()));
        return executePartition;
    }

    public Iterator<Tuple2<String, NamedAnalyzerResult>> call(Integer num, Iterator<InputRow> it) throws Exception {
        logger.info("call({}, Iterator) invoked", num);
        AnalysisJobBuilder analysisJobBuilder = this._sparkJobContext.getAnalysisJobBuilder();
        configureComponentsBeforeBuilding(analysisJobBuilder, num.intValue());
        List<Tuple2<String, NamedAnalyzerResult>> executePartition = executePartition(it, analysisJobBuilder.toAnalysisJob());
        logger.info("call({}, Iterator) finished, returning {} results", num, Integer.valueOf(executePartition.size()));
        return executePartition.iterator();
    }

    private void configureComponentsBeforeBuilding(AnalysisJobBuilder analysisJobBuilder, int i) {
        ResourceDatastore createReplacementDatastore;
        for (ComponentBuilder componentBuilder : analysisJobBuilder.getComponentBuilders()) {
            for (ConfiguredPropertyDescriptor configuredPropertyDescriptor : componentBuilder.getDescriptor().getConfiguredPropertiesByType(UpdateableDatastore.class, false)) {
                Object configuredProperty = componentBuilder.getConfiguredProperty(configuredPropertyDescriptor);
                if (configuredProperty instanceof ResourceDatastore) {
                    ResourceDatastore resourceDatastore = (ResourceDatastore) configuredProperty;
                    Resource createReplacementResource = createReplacementResource(resourceDatastore.getResource(), i);
                    if (createReplacementResource != null && (createReplacementDatastore = createReplacementDatastore(componentBuilder, resourceDatastore, createReplacementResource)) != null) {
                        componentBuilder.setConfiguredProperty(configuredPropertyDescriptor, createReplacementDatastore);
                    }
                }
            }
            for (ConfiguredPropertyDescriptor configuredPropertyDescriptor2 : componentBuilder.getDescriptor().getConfiguredPropertiesByType(Resource.class, false)) {
                Resource createReplacementResource2 = createReplacementResource((Resource) componentBuilder.getConfiguredProperty(configuredPropertyDescriptor2), i);
                if (createReplacementResource2 != null) {
                    componentBuilder.setConfiguredProperty(configuredPropertyDescriptor2, createReplacementResource2);
                }
            }
            if ((componentBuilder.getComponentInstance() instanceof CreateCsvFileAnalyzer) && i > 0) {
                componentBuilder.setConfiguredProperty("Include header", false);
            }
        }
        Iterator it = analysisJobBuilder.getConsumedOutputDataStreamsJobBuilders().iterator();
        while (it.hasNext()) {
            configureComponentsBeforeBuilding((AnalysisJobBuilder) it.next(), i);
        }
    }

    private Resource createReplacementResource(Resource resource, int i) {
        String format = String.format("%05d", Integer.valueOf(i));
        if ((resource instanceof HdfsResource) || (resource instanceof HadoopResource)) {
            return HdfsHelper.createHelper().getResourceToUse(URI.create(resource.getQualifiedPath() + "/part-" + format));
        }
        if (!(resource instanceof FileResource)) {
            return null;
        }
        File file = ((FileResource) resource).getFile();
        if (file.exists() && file.isFile()) {
            return resource;
        }
        if (!file.exists()) {
            file.mkdirs();
        }
        return new FileResource(resource.getQualifiedPath() + "/part-" + format);
    }

    private ResourceDatastore createReplacementDatastore(ComponentBuilder componentBuilder, ResourceDatastore resourceDatastore, Resource resource) {
        String name = resourceDatastore.getName();
        if (resourceDatastore instanceof CsvDatastore) {
            return new CsvDatastore(name, resource, ((CsvDatastore) resourceDatastore).getCsvConfiguration());
        }
        if (resourceDatastore instanceof JsonDatastore) {
            return new JsonDatastore(name, resource, ((JsonDatastore) resourceDatastore).getSchemaBuilder());
        }
        logger.warn("Could not replace datastore '{}' because it is of an unsupported type: ", name, resourceDatastore.getClass().getSimpleName());
        return resourceDatastore;
    }

    private List<Tuple2<String, NamedAnalyzerResult>> executePartition(Iterator<InputRow> it, AnalysisJob analysisJob) {
        this._sparkJobContext.triggerOnPartitionProcessingStart();
        DataCleanerConfiguration configuration = this._sparkJobContext.getConfiguration();
        ConsumeRowHandler.Configuration configuration2 = new ConsumeRowHandler.Configuration();
        configuration2.includeAnalyzers = true;
        configuration2.includeNonDistributedTasks = false;
        ConsumeRowHandler consumeRowHandler = new ConsumeRowHandler(analysisJob, configuration, configuration2);
        while (it.hasNext()) {
            InputRow next = it.next();
            consumeRowHandler.consumeRow(next);
            logger.debug("Consumed row no. {}", Long.valueOf(next.getId()));
        }
        logger.info("Row processing complete - continuing to fetching results");
        List<Tuple2<String, NamedAnalyzerResult>> analyzerResults = getAnalyzerResults(consumeRowHandler.getConsumers());
        ListIterator<Tuple2<String, NamedAnalyzerResult>> listIterator = analyzerResults.listIterator();
        while (listIterator.hasNext()) {
            Tuple2<String, NamedAnalyzerResult> next2 = listIterator.next();
            NamedAnalyzerResult namedAnalyzerResult = (NamedAnalyzerResult) next2._2;
            AnalyzerResultFuture analyzerResult = namedAnalyzerResult.getAnalyzerResult();
            if (analyzerResult instanceof AnalyzerResultFuture) {
                listIterator.set(new Tuple2<>(next2._1, new NamedAnalyzerResult(namedAnalyzerResult.getName(), analyzerResult.get())));
            }
        }
        LifeCycleHelper lifeCycleHelper = new LifeCycleHelper(configuration, analysisJob, false);
        for (RowProcessingConsumer rowProcessingConsumer : consumeRowHandler.getConsumers()) {
            lifeCycleHelper.close(rowProcessingConsumer.getComponentJob().getDescriptor(), rowProcessingConsumer.getComponent(), true);
        }
        this._sparkJobContext.triggerOnPartitionProcessingEnd();
        return analyzerResults;
    }

    private List<Tuple2<String, NamedAnalyzerResult>> getAnalyzerResults(Collection<RowProcessingConsumer> collection) {
        ArrayList arrayList = new ArrayList();
        for (RowProcessingConsumer rowProcessingConsumer : collection) {
            if (rowProcessingConsumer.isResultProducer()) {
                AnalyzerResult result = ((HasAnalyzerResult) rowProcessingConsumer.getComponent()).getResult();
                String componentKey = this._sparkJobContext.getComponentKey(rowProcessingConsumer.getComponentJob());
                arrayList.add(new Tuple2(componentKey, new NamedAnalyzerResult(componentKey, result)));
            }
            Iterator it = rowProcessingConsumer.getActiveOutputDataStreams().iterator();
            while (it.hasNext()) {
                arrayList.addAll(getAnalyzerResults(((ActiveOutputDataStream) it.next()).getPublisher().getConsumers()));
            }
        }
        return arrayList;
    }
}
