/*
 * Decompiled with CFR 0.152.
 */
package com.exasol.spark.s3;

import com.exasol.errorreporting.ExaError;
import com.exasol.spark.common.ExasolOptions;
import com.exasol.spark.common.ExasolValidationException;
import com.exasol.spark.common.FilterConverter;
import com.exasol.spark.common.SelectStatementGenerator;
import com.exasol.spark.common.StatementGeneratorFactory;
import com.exasol.spark.s3.AbstractImportExportQueryGenerator;
import com.exasol.spark.s3.ExasolConnectionFactory;
import com.exasol.spark.s3.S3CleanupListener;
import com.exasol.sql.expression.BooleanExpression;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.collection.immutable.List;

public class ExasolS3ScanBuilder
implements ScanBuilder,
SupportsPushDownFilters,
SupportsPushDownRequiredColumns {
    private static final Logger LOGGER = Logger.getLogger(ExasolS3ScanBuilder.class.getName());
    private final ExasolOptions options;
    private final CaseInsensitiveStringMap properties;
    private StructType schema;
    private Filter[] pushedFilters;

    public ExasolS3ScanBuilder(ExasolOptions options, StructType schema, CaseInsensitiveStringMap properties) {
        this.options = options;
        this.schema = schema;
        this.properties = properties;
        this.pushedFilters = new Filter[0];
    }

    public Filter[] pushFilters(Filter[] filters) {
        java.util.List<Filter> unsupportedFilters = this.getUnsupportedFilters(filters);
        ArrayList<Filter> supportedFilters = new ArrayList<Filter>(Arrays.asList(filters));
        supportedFilters.removeAll(unsupportedFilters);
        this.pushedFilters = supportedFilters.toArray(new Filter[0]);
        return unsupportedFilters.toArray(new Filter[0]);
    }

    private java.util.List<Filter> getUnsupportedFilters(Filter[] filters) {
        FilterConverter filterConverter = new FilterConverter();
        return Arrays.asList(filters).stream().filter(f -> !filterConverter.isFilterSupported(f)).collect(Collectors.toList());
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType schema) {
        this.schema = schema;
    }

    public Scan build() {
        SparkSession sparkSession = SparkSession.active();
        String bucket = this.options.getS3Bucket();
        String bucketKey = this.generateRandomBucketKey(sparkSession);
        LOGGER.info(() -> "Using S3 bucket '" + bucket + "' with folder '" + bucketKey + "' for scan job data.");
        this.addSparkCleanupJobListener(sparkSession, bucketKey);
        this.prepareIntermediateData(bucketKey);
        return new CSVTable("", sparkSession, this.properties, this.getCSVFiles(bucket, bucketKey), Option.apply((Object)this.schema), CSVFileFormat.class).newScanBuilder(this.getUpdatedMapWithCSVOptions(this.properties)).build();
    }

    private String generateRandomBucketKey(SparkSession sparkSession) {
        return UUID.randomUUID() + "-" + sparkSession.sparkContext().applicationId();
    }

    private void addSparkCleanupJobListener(SparkSession spark, String bucketKey) {
        spark.sparkContext().addSparkListener((SparkListenerInterface)new S3CleanupListener(this.options, bucketKey));
    }

    private List<String> getCSVFiles(String bucket, String bucketKey) {
        String path = "s3a://" + Paths.get(bucket, bucketKey, "*.csv").toString();
        return ((Iterable)JavaConverters.collectionAsScalaIterableConverter(Arrays.asList(path)).asScala()).toList();
    }

    private CaseInsensitiveStringMap getUpdatedMapWithCSVOptions(CaseInsensitiveStringMap map) {
        HashMap<String, String> updatedMap = new HashMap<String, String>(map.asCaseSensitiveMap());
        updatedMap.put("header", "true");
        updatedMap.put("delimiter", ",");
        return new CaseInsensitiveStringMap(updatedMap);
    }

    protected String getScanQuery() {
        Optional predicate = new FilterConverter().convert(this.pushedFilters);
        SelectStatementGenerator stmtGenerator = StatementGeneratorFactory.selectFrom((String)this.getTableOrQuery()).columns(this.getColumnNames());
        if (predicate.isPresent()) {
            stmtGenerator.where((BooleanExpression)predicate.get());
        }
        return stmtGenerator.render();
    }

    private String getTableOrQuery() {
        if (this.options.hasTable()) {
            return this.options.getTable();
        }
        return "(" + this.options.getQuery() + ")";
    }

    private String[] getColumnNames() {
        return (String[])Stream.of(this.schema.fields()).map(StructField::name).toArray(String[]::new);
    }

    private void prepareIntermediateData(String bucketKey) {
        String selectQuery = this.getScanQuery();
        LOGGER.info(() -> "Preparing data for query '" + selectQuery + "'.");
        String exportQuery = new S3ExportQueryGenerator(this.options, bucketKey).generateQuery(selectQuery);
        new S3DataExporter(this.options, bucketKey).exportData(exportQuery);
    }

    private static String removeIdentifiedByPart(String input) {
        return Stream.of(input.split("\n")).filter(s -> !s.contains("IDENTIFIED BY")).collect(Collectors.joining("\n"));
    }

    private static class S3DataExporter {
        private final ExasolOptions options;
        private final String bucket;
        private final String bucketKey;

        public S3DataExporter(ExasolOptions options, String bucketKey) {
            this.options = options;
            this.bucket = options.getS3Bucket();
            this.bucketKey = bucketKey;
        }

        /*
         * Enabled aggressive exception aggregation
         */
        public int exportData(String exportQuery) {
            ExasolConnectionFactory connectionFactory = new ExasolConnectionFactory(this.options);
            try (Connection connection = connectionFactory.getConnection();){
                int n;
                block14: {
                    Statement statement = connection.createStatement();
                    try {
                        int numberOfExportedRows = statement.executeUpdate(exportQuery);
                        LOGGER.info(() -> "Exported '" + numberOfExportedRows + "' rows into '" + this.bucket + "/" + this.bucketKey + "'.");
                        n = numberOfExportedRows;
                        if (statement == null) break block14;
                    }
                    catch (Throwable throwable) {
                        if (statement != null) {
                            try {
                                statement.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    statement.close();
                }
                return n;
            }
            catch (SQLException exception) {
                throw new ExasolValidationException(ExaError.messageBuilder((String)"E-SEC-22").message("Failed to run export query {{exportQuery}} into S3 location {{s3Path}}.", new Object[0]).parameter("exportQuery", (Object)ExasolS3ScanBuilder.removeIdentifiedByPart(exportQuery)).parameter("s3Path", (Object)(this.bucket + "/" + this.bucketKey)).mitigation("Please ensure that query and table name are correct and satisfy SQL syntax requirements.", new Object[0]).toString(), (Throwable)exception);
            }
        }
    }

    private static class S3ExportQueryGenerator
    extends AbstractImportExportQueryGenerator {
        private final String bucketKey;
        private final int numberOfFiles;

        public S3ExportQueryGenerator(ExasolOptions options, String bucketKey) {
            super(options);
            this.bucketKey = bucketKey;
            this.numberOfFiles = options.getNumberOfPartitions();
        }

        public String generateQuery(String baseQuery) {
            return "EXPORT (\n" + baseQuery + "\n) INTO CSV\n" + this.getIdentifier() + this.getFiles() + this.getFooter();
        }

        private String getFiles() {
            StringBuilder builder = new StringBuilder();
            String prefix = "FILE '" + this.bucketKey + "/";
            for (int fileIndex = 1; fileIndex <= this.numberOfFiles; ++fileIndex) {
                builder.append(prefix).append(String.format("part-%03d", fileIndex)).append(".csv'\n");
            }
            return builder.toString();
        }

        private String getFooter() {
            return "WITH COLUMN NAMES\nBOOLEAN = 'true/false'";
        }
    }
}

