package com.exasol.spark.s3;

import com.exasol.errorreporting.ExaError;
import com.exasol.spark.common.ExasolOptions;
import com.exasol.spark.common.ExasolValidationException;
import com.exasol.spark.common.Option;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import software.amazon.awssdk.services.s3.model.S3Object;

/* loaded from: input_file:com/exasol/spark/s3/ExasolBatchWrite.class */
public class ExasolBatchWrite implements BatchWrite {
    private static final Logger LOGGER = Logger.getLogger(ExasolBatchWrite.class.getName());
    private final ExasolOptions options;
    private final BatchWrite delegate;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/exasol/spark/s3/ExasolBatchWrite$S3ImportQueryGenerator.class */
    public static class S3ImportQueryGenerator extends BaseQueryGenerator {
        public S3ImportQueryGenerator(ExasolOptions exasolOptions) {
            super(exasolOptions);
        }

        public String generateQuery() {
            return "IMPORT INTO " + this.options.getTable() + " FROM CSV\n" + getIdentifier() + getFiles() + getFooter();
        }

        private String getFiles() {
            URI pathURI = getPathURI(this.options.get(Option.INTERMEDIATE_DATA_PATH.key()));
            String host = pathURI.getHost();
            String substring = pathURI.getPath().substring(1);
            S3FileSystem fromOptions = S3FileSystem.fromOptions(this.options);
            try {
                List<S3Object> listObjects = fromOptions.listObjects(host, Optional.of(substring));
                StringBuilder sb = new StringBuilder();
                Iterator<S3Object> it = listObjects.iterator();
                while (it.hasNext()) {
                    sb.append("FILE '").append(it.next().key()).append("'\n");
                }
                String sb2 = sb.toString();
                if (fromOptions != null) {
                    fromOptions.close();
                }
                return sb2;
            } catch (Throwable th) {
                if (fromOptions != null) {
                    try {
                        fromOptions.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private URI getPathURI(String str) {
            try {
                return new URI(str);
            } catch (URISyntaxException e) {
                throw new ExasolValidationException(ExaError.messageBuilder("E-SEC-25").message("Provided path {{path}} cannot be converted to URI systax.", new Object[]{str}).mitigation("Please make sure the path is correct file system (hdfs, s3a, etc) path.", new Object[0]).toString(), e);
            }
        }

        private String getFooter() {
            return "SKIP = 1";
        }
    }

    public ExasolBatchWrite(ExasolOptions exasolOptions, Write write) {
        this.options = exasolOptions;
        this.delegate = write.toBatch();
    }

    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo physicalWriteInfo) {
        return this.delegate.createBatchWriterFactory(physicalWriteInfo);
    }

    public boolean useCommitCoordinator() {
        return this.delegate.useCommitCoordinator();
    }

    public void abort(WriterCommitMessage[] writerCommitMessageArr) {
        LOGGER.info("Running abort stage of the job.");
        cleanup();
        this.delegate.abort(writerCommitMessageArr);
    }

    private void cleanup() {
        String str = this.options.get(Option.INTERMEDIATE_DATA_PATH.key());
        LOGGER.info(() -> {
            return "Running cleanup process for directory '" + str + "'.";
        });
        S3FileSystem fromOptions = S3FileSystem.fromOptions(this.options);
        try {
            fromOptions.deleteKeys(this.options.getS3Bucket(), this.options.get(Option.WRITE_S3_BUCKET_KEY.key()));
            if (fromOptions != null) {
                fromOptions.close();
            }
        } catch (Throwable th) {
            if (fromOptions != null) {
                try {
                    fromOptions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void commit(WriterCommitMessage[] writerCommitMessageArr) {
        LOGGER.info("Committing the file writing stage of the job.");
        this.delegate.commit(writerCommitMessageArr);
        importIntermediateDataIntoExasol();
    }

    private void importIntermediateDataIntoExasol() {
        long currentTimeMillis = System.currentTimeMillis();
        String table = this.options.getTable();
        int executeImportQuery = executeImportQuery(new S3ImportQueryGenerator(this.options).generateQuery());
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LOGGER.info(() -> {
            return "Imported '" + executeImportQuery + "' rows into the table '" + table + "' in '" + currentTimeMillis2 + "' millis.";
        });
    }

    private int executeImportQuery(String str) {
        try {
            try {
                Connection connection = new ExasolConnectionFactory(this.options).getConnection();
                try {
                    Statement createStatement = connection.createStatement();
                    try {
                        connection.setAutoCommit(false);
                        int executeUpdate = createStatement.executeUpdate(str);
                        connection.commit();
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        return executeUpdate;
                    } catch (Throwable th) {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (SQLException e) {
                throw new ExasolConnectionException(ExaError.messageBuilder("E-SEC-24").message("Failure running the import {{query}} query.", new Object[]{removeCredentialsFromQuery(str)}).mitigation("Please check that connection address, username and password are correct.", new Object[0]).toString(), e);
            }
        } finally {
            cleanup();
        }
    }

    private static String removeCredentialsFromQuery(String str) {
        return (String) Stream.of((Object[]) str.split("\n")).filter(str2 -> {
            return !str2.contains("IDENTIFIED BY");
        }).collect(Collectors.joining("\n"));
    }
}
