package io.cdap.plugin.db.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceBatchSource;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.db.CommonSchemaReader;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.DBConfig;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.db.config.DatabaseSourceConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/plugin/db/source/AbstractDBSource.class */
public abstract class AbstractDBSource<T extends PluginConfig & DatabaseSourceConfig> extends ReferenceBatchSource<LongWritable, DBRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSource.class);
    private static final SchemaTypeAdapter SCHEMA_TYPE_ADAPTER = new SchemaTypeAdapter();
    private static final Pattern CONDITIONS_AND = Pattern.compile("\\$conditions (and|or)\\s+", 2);
    private static final Pattern AND_CONDITIONS = Pattern.compile("\\s+(and|or) \\$conditions", 2);
    private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions", 2);
    protected final T sourceConfig;
    protected Class<? extends Driver> driverClass;

    /* loaded from: input_file:io/cdap/plugin/db/source/AbstractDBSource$DBSourceConfig.class */
    public static abstract class DBSourceConfig extends DBConfig implements DatabaseSourceConfig {
        public static final String IMPORT_QUERY = "importQuery";
        public static final String BOUNDING_QUERY = "boundingQuery";
        public static final String SPLIT_BY = "splitBy";
        public static final String NUM_SPLITS = "numSplits";
        public static final String SCHEMA = "schema";
        public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
        public static final String FETCH_SIZE = "fetchSize";

        @Name("importQuery")
        @Description("The SELECT query to use to import data from the specified table. You can specify an arbitrary number of columns to import, or import all columns using *. The Query should contain the '$CONDITIONS' string unless numSplits is set to one. For example, 'SELECT * FROM table WHERE $CONDITIONS'. The '$CONDITIONS' stringwill be replaced by 'splitBy' field limits specified by the bounding query.")
        @Macro
        public String importQuery;

        @Name("boundingQuery")
        @Description("Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. This is required unless numSplits is set to one.")
        @Nullable
        @Macro
        public String boundingQuery;

        @Name("splitBy")
        @Description("Field Name which will be used to generate splits. This is required unless numSplits is set to one.")
        @Nullable
        @Macro
        public String splitBy;

        @Name("numSplits")
        @Description("The number of splits to generate. If set to one, the boundingQuery is not needed, and no $CONDITIONS string needs to be specified in the importQuery. If not specified, the execution framework will pick a value.")
        @Nullable
        @Macro
        public Integer numSplits;

        @Name("schema")
        @Description("The schema of records output by the source. This will be used in place of whatever schema comes back from the query. This should only be used if there is a bug in your jdbc driver. For example, if a column is not correctly getting marked as nullable.")
        @Nullable
        public String schema;

        @Name("fetchSize")
        @Description("The number of rows to fetch at a time per split. Larger fetch size can result in faster import, with the tradeoff of higher memory usage.")
        @Nullable
        @Macro
        private Integer fetchSize;

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public String getImportQuery() {
            return cleanQuery(this.importQuery);
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public String getBoundingQuery() {
            return cleanQuery(this.boundingQuery);
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public Integer getNumSplits() {
            return this.numSplits;
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public String getSplitBy() {
            return this.splitBy;
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public void validate(FailureCollector failureCollector) {
            boolean z = false;
            if (!containsMacro("numSplits") && this.numSplits != null) {
                if (this.numSplits.intValue() < 1) {
                    failureCollector.addFailure(String.format("Invalid value for numSplits '%d'. Must be at least 1.", this.numSplits), (String) null).withConfigProperty("numSplits");
                }
                if (this.numSplits.intValue() == 1) {
                    z = true;
                }
            }
            if (getTransactionIsolationLevel() != null) {
                TransactionIsolationLevel.validate(getTransactionIsolationLevel(), failureCollector);
            }
            if (!containsMacro("importQuery") && Strings.isNullOrEmpty(this.importQuery)) {
                failureCollector.addFailure("Import Query must be specified.", (String) null).withConfigProperty("importQuery");
            }
            if (!z && !containsMacro("numSplits") && !containsMacro("importQuery") && !getImportQuery().contains("$CONDITIONS")) {
                failureCollector.addFailure("Invalid Import Query.", String.format("Import Query %s must contain the string '$CONDITIONS'.", this.importQuery)).withConfigProperty("importQuery");
            }
            if (!z && !containsMacro("numSplits") && !containsMacro("splitBy") && (this.splitBy == null || this.splitBy.isEmpty())) {
                failureCollector.addFailure("Split-By Field Name must be specified if Number of Splits is not set to 1.", (String) null).withConfigProperty("splitBy").withConfigProperty("numSplits");
            }
            if (z || containsMacro("numSplits") || containsMacro("boundingQuery")) {
                return;
            }
            if (this.boundingQuery == null || this.boundingQuery.isEmpty()) {
                failureCollector.addFailure("Bounding Query must be specified if Number of Splits is not set to 1.", (String) null).withConfigProperty("boundingQuery").withConfigProperty("numSplits");
            }
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public void validateSchema(Schema schema, FailureCollector failureCollector) {
            validateSchema(schema, getSchema(), failureCollector);
        }

        @VisibleForTesting
        static void validateSchema(Schema schema, Schema schema2, FailureCollector failureCollector) {
            if (schema2 == null) {
                failureCollector.addFailure("Schema should not be null or empty.", (String) null).withConfigProperty("schema");
                return;
            }
            for (Schema.Field field : schema2.getFields()) {
                Schema.Field field2 = schema.getField(field.getName());
                if (field2 == null) {
                    failureCollector.addFailure(String.format("Schema field '%s' is not present in actual record", field.getName()), (String) null).withOutputSchemaField(field.getName());
                } else {
                    Schema nonNullable = field2.getSchema().isNullable() ? field2.getSchema().getNonNullable() : field2.getSchema();
                    Schema nonNullable2 = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
                    if (nonNullable.getType() != nonNullable2.getType() || nonNullable.getLogicalType() != nonNullable2.getLogicalType()) {
                        failureCollector.addFailure(String.format("Schema field '%s' has type '%s but found '%s'.", field.getName(), nonNullable2.getDisplayName(), nonNullable.getDisplayName()), (String) null).withOutputSchemaField(field.getName());
                    }
                }
            }
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        @Nullable
        public Schema getSchema() {
            try {
                if (Strings.isNullOrEmpty(this.schema)) {
                    return null;
                }
                return Schema.parseJson(this.schema);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to parse schema '%s'. Reason: %s", this.schema, e.getMessage()), e);
            }
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public boolean canConnect() {
            return (containsMacro(ConnectionConfig.HOST) || containsMacro(ConnectionConfig.PORT) || containsMacro(ConnectionConfig.USER) || containsMacro(ConnectionConfig.PASSWORD) || containsMacro("database") || containsMacro("importQuery")) ? false : true;
        }

        @Override // io.cdap.plugin.db.config.DatabaseSourceConfig
        public Integer getFetchSize() {
            return this.fetchSize;
        }
    }

    /* loaded from: input_file:io/cdap/plugin/db/source/AbstractDBSource$GetSchemaRequest.class */
    public static class GetSchemaRequest {

        @Nullable
        public String host;

        @Nullable
        public int port;

        @Nullable
        public String database;

        @Nullable
        public String connectionString;

        @Nullable
        public String connectionArguments;

        @Nullable
        public String user;

        @Nullable
        public String password;
        public String query;

        @Nullable
        public String jdbcPluginName;
    }

    public AbstractDBSource(T t) {
        super(new ReferencePluginConfig(t.getReferenceName()));
        this.sourceConfig = t;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, this.sourceConfig, getJDBCPluginId());
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        this.sourceConfig.validate(failureCollector);
        if (this.sourceConfig.getSchema() != null) {
            stageConfigurer.setOutputSchema(this.sourceConfig.getSchema());
            return;
        }
        if (this.sourceConfig.containsMacro(ConnectionConfig.JDBC_PLUGIN_NAME)) {
            return;
        }
        Class<? extends Driver> driverClass = DBUtils.getDriverClass(pipelineConfigurer, this.sourceConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
        if (this.sourceConfig.canConnect()) {
            try {
                stageConfigurer.setOutputSchema(getSchema(driverClass));
            } catch (IllegalAccessException | InstantiationException e) {
                failureCollector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), (String) null).withStacktrace(e.getStackTrace());
            } catch (SQLException e2) {
                failureCollector.addFailure(String.format("SQL error while getting query schema: Error: %s, SQLState: %s, ErrorCode: %s", e2.getMessage(), e2.getSQLState(), Integer.valueOf(e2.getErrorCode())), (String) null).withStacktrace(e2.getStackTrace());
            } catch (Exception e3) {
                failureCollector.addFailure(e3.getMessage(), (String) null).withStacktrace(e3.getStackTrace());
            }
        }
    }

    public Schema getSchema(Class<? extends Driver> cls) throws IllegalAccessException, SQLException, InstantiationException {
        try {
            DriverCleanup loadPluginClassAndGetDriver = loadPluginClassAndGetDriver(cls);
            try {
                Schema schema = getSchema();
                loadPluginClassAndGetDriver.destroy();
                return schema;
            } catch (Throwable th) {
                loadPluginClassAndGetDriver.destroy();
                throw th;
            }
        } catch (Exception e) {
            LOG.error("Exception while performing getSchema", e);
            throw e;
        }
    }

    public Schema getSchema() throws SQLException {
        Connection connection = getConnection();
        Throwable th = null;
        try {
            executeInitQueries(connection, this.sourceConfig.getInitQueries());
            Schema loadSchemaFromDB = loadSchemaFromDB(connection, this.sourceConfig.getImportQuery());
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connection.close();
                }
            }
            return loadSchemaFromDB;
        } catch (Throwable th3) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connection.close();
                }
            }
            throw th3;
        }
    }

    private Schema loadSchemaFromDB(Connection connection, String str) throws SQLException {
        Statement createStatement = connection.createStatement();
        createStatement.setMaxRows(1);
        if (str.contains("$CONDITIONS")) {
            str = removeConditionsClause(str);
        }
        return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(createStatement.executeQuery(str)));
    }

    @VisibleForTesting
    static String removeConditionsClause(String str) {
        return WHERE_CONDITIONS.matcher(AND_CONDITIONS.matcher(CONDITIONS_AND.matcher(str).replaceAll("")).replaceAll("")).replaceAll("");
    }

    private Schema loadSchemaFromDB(Class<? extends Driver> cls) throws SQLException, IllegalAccessException, InstantiationException {
        String connectionString = this.sourceConfig.getConnectionString();
        DriverCleanup ensureJDBCDriverIsAvailable = DBUtils.ensureJDBCDriverIsAvailable(cls, connectionString, this.sourceConfig.getJdbcPluginName());
        Properties properties = new Properties();
        properties.putAll(this.sourceConfig.getConnectionArguments());
        try {
            try {
                Connection connection = DriverManager.getConnection(connectionString, properties);
                Throwable th = null;
                try {
                    executeInitQueries(connection, this.sourceConfig.getInitQueries());
                    Schema loadSchemaFromDB = loadSchemaFromDB(connection, this.sourceConfig.getImportQuery());
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    return loadSchemaFromDB;
                } catch (Throwable th3) {
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            connection.close();
                        }
                    }
                    throw th3;
                }
            } catch (SQLException e) {
                String format = String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(), e.getSQLState(), Integer.valueOf(e.getErrorCode()));
                String format2 = String.format("Error occurred while trying to get schema from database.Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), Integer.valueOf(e.getErrorCode()), e.getSQLState());
                String externalDocumentationLink = getExternalDocumentationLink();
                if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
                    if (!format.endsWith(".")) {
                        format = format + ".";
                    }
                    format = String.format("%s For more details, see %s", format, externalDocumentationLink);
                }
                throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), format, format2, ErrorType.USER, false, ErrorCodeType.SQLSTATE, e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode()));
            }
        } finally {
            ensureJDBCDriverIsAvailable.destroy();
        }
    }

    private void executeInitQueries(Connection connection, List<String> list) throws SQLException {
        for (String str : list) {
            Statement createStatement = connection.createStatement();
            Throwable th = null;
            try {
                try {
                    createStatement.execute(str);
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (createStatement != null) {
                    if (th != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th3;
            }
        }
    }

    protected SchemaReader getSchemaReader() {
        return new CommonSchemaReader();
    }

    protected String getErrorDetailsProviderClassName() {
        return null;
    }

    private DriverCleanup loadPluginClassAndGetDriver(Class<? extends Driver> cls) throws IllegalAccessException, InstantiationException, SQLException {
        if (cls == null) {
            throw new InstantiationException(String.format("Unable to load Driver class with plugin type %s and plugin name %s", ConnectionConfig.JDBC_PLUGIN_TYPE, this.sourceConfig.getJdbcPluginName()));
        }
        try {
            return DBUtils.ensureJDBCDriverIsAvailable(cls, createConnectionString(), this.sourceConfig.getJdbcPluginName());
        } catch (IllegalAccessException | InstantiationException | SQLException e) {
            LOG.error("Unable to load or register driver {}", cls, e);
            throw e;
        }
    }

    private Connection getConnection() throws SQLException {
        String createConnectionString = createConnectionString();
        Properties properties = new Properties();
        properties.putAll(this.sourceConfig.getConnectionArguments());
        return DriverManager.getConnection(createConnectionString, properties);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.sourceConfig.validate(failureCollector);
        failureCollector.getOrThrowException();
        LOG.debug("pluginType = {}; pluginName = {}; connectionString = {}; importQuery = {}; boundingQuery = {};", new Object[]{ConnectionConfig.JDBC_PLUGIN_TYPE, this.sourceConfig.getJdbcPluginName(), this.sourceConfig.getConnectionString(), this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery()});
        Class<? extends Driver> loadPluginClass = batchSourceContext.loadPluginClass(getJDBCPluginId());
        Schema loadSchemaFromDB = loadSchemaFromDB(loadPluginClass);
        ConnectionConfigAccessor connectionConfigAccessor = getConnectionConfigAccessor(loadPluginClass.getName(), loadSchemaFromDB, failureCollector);
        LineageRecorder lineageRecorder = getLineageRecorder(batchSourceContext);
        Schema schema = this.sourceConfig.getSchema() == null ? loadSchemaFromDB : this.sourceConfig.getSchema();
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null) {
            lineageRecorder.recordRead("Read", "Read from database plugin", (List) schema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
        if (!Strings.isNullOrEmpty(getErrorDetailsProviderClassName())) {
            batchSourceContext.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName()));
        }
        batchSourceContext.setInput(Input.of(this.sourceConfig.getReferenceName(), new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
    }

    public ConnectionConfigAccessor getConnectionConfigAccessor(String str, Schema schema, FailureCollector failureCollector) throws IOException {
        ConnectionConfigAccessor connectionConfigAccessor = new ConnectionConfigAccessor();
        if (this.sourceConfig.getUser() == null && this.sourceConfig.getPassword() == null) {
            DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), str, this.sourceConfig.getConnectionString());
        } else {
            DBConfiguration.configureDB(connectionConfigAccessor.getConfiguration(), str, this.sourceConfig.getConnectionString(), this.sourceConfig.getUser(), this.sourceConfig.getPassword());
        }
        if (this.sourceConfig.getFetchSize() != null) {
            connectionConfigAccessor.setFetchSize(this.sourceConfig.getFetchSize());
        }
        DataDrivenETLDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(), this.sourceConfig.getImportQuery(), this.sourceConfig.getBoundingQuery(), false);
        if (this.sourceConfig.getTransactionIsolationLevel() != null) {
            connectionConfigAccessor.setTransactionIsolationLevel(this.sourceConfig.getTransactionIsolationLevel());
        }
        connectionConfigAccessor.setConnectionArguments(this.sourceConfig.getConnectionArguments());
        connectionConfigAccessor.setInitQueries(this.sourceConfig.getInitQueries());
        if (this.sourceConfig.getNumSplits() == null || this.sourceConfig.getNumSplits().intValue() != 1) {
            if (!this.sourceConfig.getImportQuery().contains("$CONDITIONS")) {
                throw new IllegalArgumentException(String.format("Import Query %s must contain the string '$CONDITIONS'.", this.sourceConfig.getImportQuery()));
            }
            connectionConfigAccessor.getConfiguration().set("mapreduce.jdbc.input.orderby", this.sourceConfig.getSplitBy());
        }
        if (this.sourceConfig.getNumSplits() != null) {
            connectionConfigAccessor.getConfiguration().setInt("mapreduce.job.maps", this.sourceConfig.getNumSplits().intValue());
        }
        if (this.sourceConfig.getSchema() != null) {
            this.sourceConfig.validateSchema(schema, failureCollector);
            failureCollector.getOrThrowException();
            connectionConfigAccessor.setSchema(this.sourceConfig.getSchema().toString());
        } else {
            connectionConfigAccessor.setSchema(SCHEMA_TYPE_ADAPTER.toJson(schema));
        }
        return connectionConfigAccessor;
    }

    protected LineageRecorder getLineageRecorder(BatchSourceContext batchSourceContext) {
        return new LineageRecorder(batchSourceContext, this.sourceConfig.getReferenceName());
    }

    protected Class<? extends DBWritable> getDBRecordType() {
        return DBRecord.class;
    }

    protected String getExternalDocumentationLink() {
        return null;
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.driverClass = batchRuntimeContext.loadPluginClass(getJDBCPluginId());
    }

    public void transform(KeyValue<LongWritable, DBRecord> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(((DBRecord) keyValue.getValue()).getRecord());
    }

    public void destroy() {
        DBUtils.cleanup(this.driverClass);
    }

    private String getJDBCPluginId() {
        return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, this.sourceConfig.getJdbcPluginName());
    }

    protected abstract String createConnectionString();

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, DBRecord>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
