package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.naming.TableNamingStrategy;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.pipeline.sink.spi.ChangeEventSink;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.dialect.DatabaseVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/JdbcChangeEventSink.class */
public class JdbcChangeEventSink implements ChangeEventSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChangeEventSink.class);
    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
    public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";
    private final JdbcSinkConnectorConfig config;
    private final DatabaseDialect dialect;
    private final StatelessSession session;
    private final TableNamingStrategy tableNamingStrategy;
    private final RecordWriter recordWriter;

    public JdbcChangeEventSink(JdbcSinkConnectorConfig jdbcSinkConnectorConfig, StatelessSession statelessSession, DatabaseDialect databaseDialect, RecordWriter recordWriter) {
        this.config = jdbcSinkConnectorConfig;
        this.tableNamingStrategy = jdbcSinkConnectorConfig.getTableNamingStrategy();
        this.dialect = databaseDialect;
        this.session = statelessSession;
        this.recordWriter = recordWriter;
        DatabaseVersion version = this.dialect.getVersion();
        LOGGER.info("Database version {}.{}.{}", new Object[]{Integer.valueOf(version.getMajor()), Integer.valueOf(version.getMinor()), Integer.valueOf(version.getMicro())});
    }

    @Override // io.debezium.pipeline.sink.spi.ChangeEventSink
    public void execute(Collection<SinkRecord> collection) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (SinkRecord sinkRecord : collection) {
            LOGGER.trace("Processing {}", sinkRecord);
            validate(sinkRecord);
            Optional<TableId> tableId = getTableId(sinkRecord);
            if (tableId.isEmpty()) {
                LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", new Object[]{sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset())});
            } else {
                SinkRecordDescriptor buildRecordSinkDescriptor = buildRecordSinkDescriptor(sinkRecord);
                TableId tableId2 = tableId.get();
                if (buildRecordSinkDescriptor.isTombstone()) {
                    LOGGER.debug("Skipping tombstone record {}", buildRecordSinkDescriptor);
                } else {
                    if (buildRecordSinkDescriptor.isTruncate()) {
                        if (this.config.isTruncateEnabled()) {
                            flushBuffers(hashMap);
                            flushBuffers(hashMap2);
                            try {
                                writeTruncate(this.dialect.getTruncateStatement(checkAndApplyTableChangesIfNeeded(tableId2, buildRecordSinkDescriptor)));
                            } catch (SQLException e) {
                                throw new ConnectException("Failed to process a sink record", e);
                            }
                        } else {
                            LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", buildRecordSinkDescriptor.getTopicName());
                        }
                    }
                    if (!buildRecordSinkDescriptor.isDelete()) {
                        if (hashMap2.get(tableId2) != null && !hashMap2.get(tableId2).isEmpty()) {
                            flushBuffer(tableId2, hashMap2.get(tableId2).flush());
                        }
                        flushBuffer(tableId2, hashMap.computeIfAbsent(tableId2, tableId3 -> {
                            return new RecordBuffer(this.config);
                        }).add(buildRecordSinkDescriptor));
                    } else if (this.config.isDeleteEnabled()) {
                        flushBuffer(tableId2, hashMap2.computeIfAbsent(tableId2, tableId4 -> {
                            return new RecordBuffer(this.config);
                        }).add(buildRecordSinkDescriptor));
                    } else {
                        LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", buildRecordSinkDescriptor.getTopicName());
                    }
                }
            }
        }
        flushBuffers(hashMap);
        flushBuffers(hashMap2);
    }

    private void validate(SinkRecord sinkRecord) {
        if (isSchemaChange(sinkRecord)) {
            LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
            throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
        }
    }

    private static boolean isSchemaChange(SinkRecord sinkRecord) {
        return (sinkRecord.valueSchema() == null || Strings.isNullOrEmpty(sinkRecord.valueSchema().name()) || !sinkRecord.valueSchema().name().contains(SCHEMA_CHANGE_VALUE)) ? false : true;
    }

    private SinkRecordDescriptor buildRecordSinkDescriptor(SinkRecord sinkRecord) {
        try {
            return SinkRecordDescriptor.builder().withPrimaryKeyMode(this.config.getPrimaryKeyMode()).withPrimaryKeyFields(this.config.getPrimaryKeyFields()).withFieldFilters(this.config.getFieldsFilter()).withSinkRecord(sinkRecord).withDialect(this.dialect).build();
        } catch (Exception e) {
            throw new ConnectException("Failed to process a sink record", e);
        }
    }

    private void flushBuffers(Map<TableId, RecordBuffer> map) {
        map.forEach((tableId, recordBuffer) -> {
            flushBuffer(tableId, recordBuffer.flush());
        });
    }

    private void flushBuffer(TableId tableId, List<SinkRecordDescriptor> list) {
        if (list.isEmpty()) {
            return;
        }
        LOGGER.debug("Flushing records in JDBC Writer for table: {}", tableId.getTableName());
        try {
            this.recordWriter.write(list, getSqlStatement(checkAndApplyTableChangesIfNeeded(tableId, list.get(0)), list.get(0)));
        } catch (Exception e) {
            throw new ConnectException("Failed to process a sink record", e);
        }
    }

    private Optional<TableId> getTableId(SinkRecord sinkRecord) {
        String resolveTableName = this.tableNamingStrategy.resolveTableName(this.config, sinkRecord);
        return resolveTableName == null ? Optional.empty() : Optional.of(this.dialect.getTableId(resolveTableName));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.session == null || !this.session.isOpen()) {
            LOGGER.info("Session already closed.");
        } else {
            LOGGER.info("Closing session.");
            this.session.close();
        }
    }

    private TableDescriptor checkAndApplyTableChangesIfNeeded(TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        if (hasTable(tableId)) {
            try {
                return alterTableIfNeeded(tableId, sinkRecordDescriptor);
            } catch (SQLException e) {
                LOGGER.error("Failed to alter the table '{}'.", tableId.toFullIdentiferString(), e);
                throw e;
            }
        }
        try {
            return createTable(tableId, sinkRecordDescriptor);
        } catch (SQLException e2) {
            LOGGER.warn("Table creation failed for '{}', attempting to alter the table", tableId.toFullIdentiferString(), e2);
            try {
                return alterTableIfNeeded(tableId, sinkRecordDescriptor);
            } catch (SQLException e3) {
                LOGGER.error("Failed to alter the table '{}'.", tableId.toFullIdentiferString(), e3);
                throw e3;
            }
        }
    }

    private boolean hasTable(TableId tableId) {
        return ((Boolean) this.session.doReturningWork(connection -> {
            return Boolean.valueOf(this.dialect.tableExists(connection, tableId));
        })).booleanValue();
    }

    private TableDescriptor readTable(TableId tableId) {
        return (TableDescriptor) this.session.doReturningWork(connection -> {
            return this.dialect.readTable(connection, tableId);
        });
    }

    private TableDescriptor createTable(TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        LOGGER.debug("Attempting to create table '{}'.", tableId.toFullIdentiferString());
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be created because schema evolution is disabled.", tableId.toFullIdentiferString());
            throw new SQLException("Cannot create table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String createTableStatement = this.dialect.getCreateTableStatement(sinkRecordDescriptor, tableId);
            LOGGER.trace("SQL: {}", createTableStatement);
            this.session.createNativeQuery(createTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(tableId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private TableDescriptor alterTableIfNeeded(TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        LOGGER.debug("Attempting to alter table '{}'.", tableId.toFullIdentiferString());
        if (!hasTable(tableId)) {
            LOGGER.error("Table '{}' does not exist and cannot be altered.", tableId.toFullIdentiferString());
            throw new SQLException("Could not find table: " + tableId.toFullIdentiferString());
        }
        TableDescriptor readTable = readTable(tableId);
        Set<String> resolveMissingFields = this.dialect.resolveMissingFields(sinkRecordDescriptor, readTable);
        if (resolveMissingFields.isEmpty()) {
            return readTable;
        }
        LOGGER.debug("The follow fields are missing in the table: {}", resolveMissingFields);
        Iterator<String> it = resolveMissingFields.iterator();
        while (it.hasNext()) {
            SinkRecordDescriptor.FieldDescriptor fieldDescriptor = sinkRecordDescriptor.getFields().get(it.next());
            if (!fieldDescriptor.getSchema().isOptional() && fieldDescriptor.getSchema().defaultValue() == null) {
                throw new SQLException(String.format("Cannot ALTER table '%s' because field '%s' is not optional but has no default value", tableId.toFullIdentiferString(), fieldDescriptor.getName()));
            }
        }
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be altered because schema evolution is disabled.", tableId.toFullIdentiferString());
            throw new SQLException("Cannot alter table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String alterTableStatement = this.dialect.getAlterTableStatement(readTable, sinkRecordDescriptor, resolveMissingFields);
            LOGGER.trace("SQL: {}", alterTableStatement);
            this.session.createNativeQuery(alterTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(tableId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private String getSqlStatement(TableDescriptor tableDescriptor, SinkRecordDescriptor sinkRecordDescriptor) {
        if (sinkRecordDescriptor.isDelete()) {
            return this.dialect.getDeleteStatement(tableDescriptor, sinkRecordDescriptor);
        }
        switch (this.config.getInsertMode()) {
            case INSERT:
                return this.dialect.getInsertStatement(tableDescriptor, sinkRecordDescriptor);
            case UPSERT:
                if (sinkRecordDescriptor.getKeyFieldNames().isEmpty()) {
                    throw new ConnectException("Cannot write to table " + tableDescriptor.getId().getTableName() + " with no key fields defined.");
                }
                return this.dialect.getUpsertStatement(tableDescriptor, sinkRecordDescriptor);
            case UPDATE:
                return this.dialect.getUpdateStatement(tableDescriptor, sinkRecordDescriptor);
            default:
                throw new DataException(String.format("Unable to get SQL statement for %s", sinkRecordDescriptor));
        }
    }

    private void writeTruncate(String str) throws SQLException {
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            this.session.createNativeQuery(str, Object.class).executeUpdate();
            beginTransaction.commit();
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }
}
