/*
 * Decompiled with CFR 0.152.
 */
package com.exasol.spark.s3;

import com.exasol.errorreporting.ExaError;
import com.exasol.spark.common.ExasolOptions;
import com.exasol.spark.common.ExasolValidationException;
import com.exasol.spark.common.Option;
import com.exasol.spark.s3.AbstractImportExportQueryGenerator;
import com.exasol.spark.s3.S3FileSystem;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import software.amazon.awssdk.services.s3.model.S3Object;

public class ExasolBatchWrite
implements BatchWrite {
    private static final Logger LOGGER = Logger.getLogger(ExasolBatchWrite.class.getName());
    private final ExasolOptions options;
    private final BatchWrite delegate;

    public ExasolBatchWrite(ExasolOptions options, Write delegate) {
        this.options = options;
        this.delegate = delegate.toBatch();
    }

    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
        return this.delegate.createBatchWriterFactory(info);
    }

    public boolean useCommitCoordinator() {
        return this.delegate.useCommitCoordinator();
    }

    public void abort(WriterCommitMessage[] messages) {
        LOGGER.info("Running abort stage of the job.");
        this.cleanup();
        this.delegate.abort(messages);
    }

    private void cleanup() {
        String intermediateLocation = this.options.get(Option.INTERMEDIATE_DATA_PATH.key());
        LOGGER.info(() -> "Running cleanup process for directory '" + intermediateLocation + "'.");
        try (S3FileSystem s3FileSystem = S3FileSystem.fromOptions(this.options);){
            s3FileSystem.deleteKeys(this.options.getS3Bucket(), this.options.get(Option.WRITE_S3_BUCKET_KEY.key()));
        }
    }

    public void commit(WriterCommitMessage[] messages) {
        LOGGER.info("Committing the file writing stage of the job.");
        this.delegate.commit(messages);
        this.importIntermediateDataIntoExasol();
    }

    private void importIntermediateDataIntoExasol() {
        long start = System.currentTimeMillis();
        String table = this.options.getTable();
        String query = new S3ImportQueryGenerator(this.options).generateQuery();
        int rows = this.executeImportQuery(query);
        long time = System.currentTimeMillis() - start;
        LOGGER.info(() -> "Imported '" + rows + "' rows into the table '" + table + "' in '" + time + "' millis.");
    }

    /*
     * Exception decompiling
     */
    private int executeImportQuery(String query) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private static String removeCredentialsFromQuery(String input) {
        return Stream.of(input.split("\n")).filter(s -> !s.contains("IDENTIFIED BY")).collect(Collectors.joining("\n"));
    }

    private static class S3ImportQueryGenerator
    extends AbstractImportExportQueryGenerator {
        public S3ImportQueryGenerator(ExasolOptions options) {
            super(options);
        }

        public String generateQuery() {
            String table = this.options.getTable();
            return "IMPORT INTO " + table + " FROM CSV\n" + this.getIdentifier() + this.getFiles() + this.getFooter();
        }

        private String getFiles() {
            String path = this.options.get(Option.INTERMEDIATE_DATA_PATH.key());
            URI pathURI = this.getPathURI(path);
            String bucketName = pathURI.getHost();
            String bucketKey = pathURI.getPath().substring(1);
            try (S3FileSystem s3FileSystem = S3FileSystem.fromOptions(this.options);){
                List<S3Object> objects = s3FileSystem.listObjects(bucketName, Optional.of(bucketKey));
                StringBuilder builder = new StringBuilder();
                for (S3Object object : objects) {
                    builder.append("FILE '").append(object.key()).append("'\n");
                }
                String string = builder.toString();
                return string;
            }
        }

        private URI getPathURI(String path) {
            try {
                return new URI(path);
            }
            catch (URISyntaxException exception) {
                throw new ExasolValidationException(ExaError.messageBuilder((String)"E-SEC-25").message("Provided path {{path}} cannot be converted to URI systax.", new Object[]{path}).mitigation("Please make sure the path is correct file system (hdfs, s3a, etc) path.", new Object[0]).toString(), (Throwable)exception);
            }
        }

        private String getFooter() {
            return "SKIP = 1";
        }
    }
}

