package io.cdap.plugin.db.batch.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.ConnectionConfig;
import io.cdap.plugin.DBManager;
import io.cdap.plugin.DBRecord;
import io.cdap.plugin.FieldCase;
import io.cdap.plugin.StructuredRecordUtils;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceBatchSource;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.common.db.DBUtils;
import io.cdap.plugin.common.db.DriverCleanup;
import io.cdap.plugin.db.batch.TransactionIsolationLevel;
import io.cdap.plugin.db.common.DBBaseConfig;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Database")
@Description("Reads from a database table(s) using a configurable SQL query. Outputs one record for each row returned by the query.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = "Database")})
@Plugin(type = "batchsource")
/* loaded from: input_file:io/cdap/plugin/db/batch/source/DBSource.class */
public class DBSource extends ReferenceBatchSource<LongWritable, DBRecord, StructuredRecord> {
    public static final String NAME = "Database";
    private static final Logger LOG = LoggerFactory.getLogger(DBSource.class);
    private static final Pattern CONDITIONS_AND = Pattern.compile("\\$conditions (and|or)\\s+", 2);
    private static final Pattern AND_CONDITIONS = Pattern.compile("\\s+(and|or) \\$conditions", 2);
    private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions", 2);
    private final DBSourceConfig sourceConfig;
    private final DBManager dbManager;
    private Class<? extends Driver> driverClass;

    /* loaded from: input_file:io/cdap/plugin/db/batch/source/DBSource$DBSourceConfig.class */
    public static class DBSourceConfig extends DBBaseConfig {
        public static final String IMPORT_QUERY = "importQuery";
        public static final String BOUNDING_QUERY = "boundingQuery";
        public static final String SPLIT_BY = "splitBy";
        public static final String NUM_SPLITS = "numSplits";
        public static final String SCHEMA = "schema";
        public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
        public static final String PATTERN_TO_REPLACE = "patternToReplace";
        public static final String REPLACE_WITH = "replaceWith";
        public static final String FETCH_SIZE = "fetchSize";

        @Name(IMPORT_QUERY)
        @Description("The SELECT query to use to import data from the specified table. You can specify an arbitrary number of columns to import, or import all columns using *. The Query should contain the '$CONDITIONS' string unless numSplits is set to one. For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' stringwill be replaced by 'splitBy' field limits specified by the bounding query.")
        @Macro
        String importQuery;

        @Name(BOUNDING_QUERY)
        @Description("Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. This is required unless numSplits is set to one.")
        @Nullable
        @Macro
        String boundingQuery;

        @Name(SPLIT_BY)
        @Description("Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
        @Nullable
        @Macro
        String splitBy;

        @Name(NUM_SPLITS)
        @Description("The number of splits to generate. If set to one, the boundingQuery is not needed, and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the execution framework will pick a value.")
        @Nullable
        @Macro
        Integer numSplits;

        @Name("transactionIsolationLevel")
        @Description("The transaction isolation level for queries run by this sink. Defaults to TRANSACTION_SERIALIZABLE. See java.sql.Connection#setTransactionIsolation for more details. The Phoenix jdbc driver will throw an exception if the Phoenix database does not have transactions enabled and this setting is set to true. For drivers like that, this should be set to TRANSACTION_NONE.")
        @Nullable
        @Macro
        public String transactionIsolationLevel;

        @Name("schema")
        @Description("The schema of records output by the source. This will be used in place of whatever schema comes back from the query. This should only be used if there is a bug in your jdbc driver. For example, if a column is not correctly getting marked as nullable.")
        @Nullable
        String schema;

        @Name(PATTERN_TO_REPLACE)
        @Description("The pattern to replace in the field name in the table, it is typically used with the Replace With config. If Replace With is not set, the pattern will be removed in the field name.")
        @Nullable
        String patternToReplace;

        @Name(REPLACE_WITH)
        @Description("The string that will be replaced in the field name in the table, it must be used with the Pattern To Replace config.")
        @Nullable
        String replaceWith;

        @Name(FETCH_SIZE)
        @Description("The number of rows to fetch at a time per split. Larger fetch size can result in faster import, with the tradeoff of higher memory usage.")
        @Nullable
        @Macro
        Integer fetchSize;

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public String getImportQuery() {
            return cleanQuery(this.importQuery);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getBoundingQuery() {
            return cleanQuery(this.boundingQuery);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void validate(FailureCollector failureCollector) {
            boolean z = false;
            if (!containsMacro(NUM_SPLITS) && this.numSplits != null) {
                if (this.numSplits.intValue() < 1) {
                    failureCollector.addFailure("Number of Splits must be a positive number.", (String) null).withConfigProperty(NUM_SPLITS);
                }
                if (this.numSplits.intValue() == 1) {
                    z = true;
                }
            }
            if (!containsMacro("transactionIsolationLevel") && this.transactionIsolationLevel != null) {
                TransactionIsolationLevel.validate(this.transactionIsolationLevel, failureCollector);
            }
            if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(this.importQuery)) {
                failureCollector.addFailure("Import Query must be specified.", (String) null).withConfigProperty(IMPORT_QUERY);
            }
            if (!z && !containsMacro(IMPORT_QUERY) && !Strings.isNullOrEmpty(this.importQuery) && !getImportQuery().contains("$CONDITIONS")) {
                failureCollector.addFailure("Invalid Import Query.", String.format("Import Query %s must contain the string '$CONDITIONS'.", this.importQuery)).withConfigProperty(IMPORT_QUERY);
            }
            if (!z && !containsMacro(SPLIT_BY) && Strings.isNullOrEmpty(this.splitBy)) {
                failureCollector.addFailure("Split-By Field Name must be specified if Number of Splits is not set to 1.", (String) null).withConfigProperty(SPLIT_BY).withConfigProperty(NUM_SPLITS);
            }
            if (!z && !containsMacro(BOUNDING_QUERY) && Strings.isNullOrEmpty(this.boundingQuery)) {
                failureCollector.addFailure("Bounding Query must be specified if Number of Splits is not set to 1.", (String) null).withConfigProperty(BOUNDING_QUERY).withConfigProperty(NUM_SPLITS);
            }
            if (this.replaceWith != null && this.patternToReplace == null) {
                failureCollector.addFailure("Replace With is set but Pattern To Replace is not provided", (String) null).withConfigProperty(REPLACE_WITH).withConfigProperty(PATTERN_TO_REPLACE);
            }
            if (containsMacro(FETCH_SIZE) || this.fetchSize == null || this.fetchSize.intValue() > 0) {
                return;
            }
            failureCollector.addFailure("Invalid fetch size.", "Fetch size must be a positive integer.").withConfigProperty(FETCH_SIZE);
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public Schema getSchema(FailureCollector failureCollector) {
            try {
                if (Strings.isNullOrEmpty(this.schema)) {
                    return null;
                }
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                failureCollector.addFailure(String.format("Invalid Schema : %s", e.getMessage()), (String) null);
                throw failureCollector.getOrThrowException();
            }
        }
    }

    public DBSource(DBSourceConfig dBSourceConfig) {
        super(new ReferencePluginConfig(dBSourceConfig.getReferenceName()));
        this.sourceConfig = dBSourceConfig;
        this.dbManager = new DBManager(dBSourceConfig.getConnection(), dBSourceConfig.getJdbcPluginType());
    }

    @Override // io.cdap.plugin.common.ReferenceBatchSource
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.sourceConfig.validate(failureCollector);
        Schema schema = this.sourceConfig.getSchema(failureCollector);
        if (schema != null) {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
        }
        if (this.sourceConfig.containsMacro("jdbcPluginName")) {
            this.dbManager.validateCredentials(failureCollector);
            return;
        }
        Class<? extends Driver> validateJDBCPluginPipeline = this.dbManager.validateJDBCPluginPipeline(pipelineConfigurer, getJDBCPluginId(), failureCollector);
        failureCollector.getOrThrowException();
        if (schema != null || this.sourceConfig.containsMacro(DBSourceConfig.IMPORT_QUERY)) {
            return;
        }
        try {
            pipelineConfigurer.getStageConfigurer().setOutputSchema(getSchema(validateJDBCPluginPipeline, this.sourceConfig.patternToReplace, this.sourceConfig.replaceWith));
        } catch (IllegalAccessException | InstantiationException e) {
            failureCollector.addFailure(String.format("Failed to instantiate JDBC driver: %s", e.getMessage()), (String) null);
        } catch (SQLException e2) {
            failureCollector.addFailure(String.format("Encountered SQL error while getting query schema: %s", e2.getMessage()), (String) null);
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.sourceConfig.validate(failureCollector);
        failureCollector.getOrThrowException();
        LOG.debug("pluginName = {}; connectionString = {}; importQuery = {}; boundingQuery = {}; transaction isolation level: {}", new Object[]{this.sourceConfig.getJdbcPluginName(), this.sourceConfig.getConnectionString(), this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), this.sourceConfig.transactionIsolationLevel});
        Configuration configuration = new Configuration();
        configuration.clear();
        Class loadPluginClass = batchSourceContext.loadPluginClass(getJDBCPluginId());
        if (this.sourceConfig.getUser() == null && this.sourceConfig.getPassword() == null) {
            DBConfiguration.configureDB(configuration, loadPluginClass.getName(), this.sourceConfig.getConnectionString());
        } else {
            DBConfiguration.configureDB(configuration, loadPluginClass.getName(), this.sourceConfig.getConnectionString(), this.sourceConfig.getUser(), this.sourceConfig.getPassword());
        }
        DataDrivenETLDBInputFormat.setInput(configuration, DBRecord.class, this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), this.sourceConfig.getEnableAutoCommit());
        if (this.sourceConfig.transactionIsolationLevel != null) {
            configuration.set(TransactionIsolationLevel.CONF_KEY, this.sourceConfig.transactionIsolationLevel);
        }
        if (this.sourceConfig.getConnectionArguments() != null) {
            configuration.set(DBUtils.CONNECTION_ARGUMENTS, this.sourceConfig.getConnectionArguments());
        }
        if (this.sourceConfig.numSplits == null || this.sourceConfig.numSplits.intValue() != 1) {
            if (!this.sourceConfig.getImportQuery().contains("$CONDITIONS")) {
                throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.", this.sourceConfig.importQuery));
            }
            configuration.set("mapreduce.jdbc.input.orderby", this.sourceConfig.splitBy);
        }
        if (this.sourceConfig.numSplits != null) {
            configuration.setInt("mapreduce.job.maps", this.sourceConfig.numSplits.intValue());
        }
        if (this.sourceConfig.schema != null) {
            configuration.set(DBUtils.OVERRIDE_SCHEMA, this.sourceConfig.schema);
        }
        if (this.sourceConfig.patternToReplace != null) {
            configuration.set(DBUtils.PATTERN_TO_REPLACE, this.sourceConfig.patternToReplace);
        }
        if (this.sourceConfig.replaceWith != null) {
            configuration.set(DBUtils.REPLACE_WITH, this.sourceConfig.replaceWith);
        }
        if (this.sourceConfig.fetchSize != null) {
            configuration.setInt(DBUtils.FETCH_SIZE, this.sourceConfig.fetchSize.intValue());
        }
        batchSourceContext.setInput(Input.of(this.sourceConfig.getReferenceName(), new SourceInputFormatProvider((Class<? extends InputFormat>) DataDrivenETLDBInputFormat.class, configuration)));
        emitLineage(batchSourceContext);
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.driverClass = batchRuntimeContext.loadPluginClass(getJDBCPluginId());
    }

    public void transform(KeyValue<LongWritable, DBRecord> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(StructuredRecordUtils.convertCase(((DBRecord) keyValue.getValue()).getRecord(), FieldCase.toFieldCase(this.sourceConfig.getColumnNameCase())));
    }

    public void destroy() {
        try {
            DBUtils.cleanup(this.driverClass);
        } finally {
            this.dbManager.destroy();
        }
    }

    private String getJDBCPluginId() {
        return String.format("source.%s.%s", this.sourceConfig.getJdbcPluginType(), this.sourceConfig.getJdbcPluginName());
    }

    private Schema getSchema(Class<? extends Driver> cls, @Nullable String str, @Nullable String str2) throws IllegalAccessException, SQLException, InstantiationException {
        DriverCleanup loadPluginClassAndGetDriver = loadPluginClassAndGetDriver(cls);
        try {
            Connection connection = getConnection();
            Throwable th = null;
            try {
                try {
                    String str3 = this.sourceConfig.importQuery;
                    Statement createStatement = connection.createStatement();
                    createStatement.setMaxRows(1);
                    if (str3.contains("$CONDITIONS")) {
                        str3 = removeConditionsClause(str3);
                    }
                    Schema recordOf = Schema.recordOf("outputSchema", DBUtils.getSchemaFields(createStatement.executeQuery(str3), str, str2, null));
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    return recordOf;
                } finally {
                }
            } finally {
            }
        } finally {
            loadPluginClassAndGetDriver.destroy();
        }
    }

    @VisibleForTesting
    static String removeConditionsClause(String str) {
        return WHERE_CONDITIONS.matcher(AND_CONDITIONS.matcher(CONDITIONS_AND.matcher(str).replaceAll("")).replaceAll("")).replaceAll("");
    }

    private DriverCleanup loadPluginClassAndGetDriver(Class<? extends Driver> cls) throws IllegalAccessException, InstantiationException, SQLException {
        if (cls == null) {
            throw new InstantiationException(String.format("Unable to load JDBC driver class with plugin name %s", this.sourceConfig.getJdbcPluginName()));
        }
        try {
            return DBUtils.ensureJDBCDriverIsAvailable(cls, this.sourceConfig.getConnectionString(), this.sourceConfig.getJdbcPluginName(), this.sourceConfig.getJdbcPluginType());
        } catch (IllegalAccessException | InstantiationException | SQLException e) {
            LOG.error("Unable to load or register driver {}", cls, e);
            throw e;
        }
    }

    private Connection getConnection() throws SQLException {
        return DriverManager.getConnection(this.sourceConfig.getConnectionString(), ConnectionConfig.getConnectionArguments(this.sourceConfig.getConnectionArguments(), this.sourceConfig.getUser(), this.sourceConfig.getPassword()));
    }

    private void emitLineage(BatchSourceContext batchSourceContext) {
        Schema schema = this.sourceConfig.getSchema(batchSourceContext.getFailureCollector());
        if (schema == null) {
            schema = batchSourceContext.getOutputSchema();
        }
        LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, this.sourceConfig.getReferenceName());
        lineageRecorder.createExternalDataset(schema);
        if (schema == null || schema.getFields() == null) {
            return;
        }
        lineageRecorder.recordRead("Read", "Read from DB.", (List) schema.getFields().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, DBRecord>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
