package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.pipeline.sink.spi.ChangeEventSink;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hibernate.SessionFactory;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.dialect.DatabaseVersion;
import org.hibernate.query.NativeQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/JdbcChangeEventSink.class */
public class JdbcChangeEventSink implements ChangeEventSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChangeEventSink.class);
    private final JdbcSinkConnectorConfig config;
    private final SessionFactory sessionFactory;
    private final DatabaseDialect dialect;
    private final StatelessSession session;

    public JdbcChangeEventSink(JdbcSinkConnectorConfig jdbcSinkConnectorConfig) {
        this.config = jdbcSinkConnectorConfig;
        this.sessionFactory = jdbcSinkConnectorConfig.getHibernateConfiguration().buildSessionFactory();
        this.dialect = DatabaseDialectResolver.resolve(jdbcSinkConnectorConfig, this.sessionFactory);
        this.session = this.sessionFactory.openStatelessSession();
        DatabaseVersion version = this.dialect.getVersion();
        LOGGER.info("Database version {}.{}.{}", new Object[]{Integer.valueOf(version.getMajor()), Integer.valueOf(version.getMinor()), Integer.valueOf(version.getMicro())});
    }

    @Override // io.debezium.pipeline.sink.spi.ChangeEventSink
    public void execute(SinkRecord sinkRecord) {
        try {
            SinkRecordDescriptor build = SinkRecordDescriptor.builder().withPrimaryKeyMode(this.config.getPrimaryKeyMode()).withPrimaryKeyFields(this.config.getPrimaryKeyFields()).withSinkRecord(sinkRecord).withDialect(this.dialect).build();
            write(checkAndApplyTableChangesIfNeeded(sinkRecord, build), build);
        } catch (Exception e) {
            throw new ConnectException("Failed to process a sink record", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.session == null || !this.session.isOpen()) {
            LOGGER.info("Session already closed.");
        } else {
            LOGGER.info("Closing session.");
            this.session.close();
        }
        if (this.sessionFactory == null || !this.sessionFactory.isOpen()) {
            LOGGER.info("Session factory already closed");
        } else {
            LOGGER.info("Closing the session factory");
            this.sessionFactory.close();
        }
    }

    private TableDescriptor checkAndApplyTableChangesIfNeeded(SinkRecord sinkRecord, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        TableId tableIdFromTopic = this.dialect.getTableIdFromTopic(sinkRecord);
        if (hasTable(tableIdFromTopic)) {
            try {
                return alterTableIfNeeded(tableIdFromTopic, sinkRecordDescriptor);
            } catch (SQLException e) {
                LOGGER.error("Failed to alter the table '{}'.", tableIdFromTopic.toFullIdentiferString(), e);
                throw e;
            }
        }
        try {
            return createTable(tableIdFromTopic, sinkRecordDescriptor);
        } catch (SQLException e2) {
            LOGGER.warn("Table creation failed for '{}', attempting to alter the table", tableIdFromTopic.toFullIdentiferString(), e2);
            try {
                return alterTableIfNeeded(tableIdFromTopic, sinkRecordDescriptor);
            } catch (SQLException e3) {
                LOGGER.error("Failed to alter the table '{}'.", tableIdFromTopic.toFullIdentiferString(), e3);
                throw e3;
            }
        }
    }

    private boolean hasTable(TableId tableId) {
        return ((Boolean) this.session.doReturningWork(connection -> {
            return Boolean.valueOf(this.dialect.tableExists(connection, tableId));
        })).booleanValue();
    }

    private TableDescriptor readTable(TableId tableId) {
        return (TableDescriptor) this.session.doReturningWork(connection -> {
            return this.dialect.readTable(connection, tableId);
        });
    }

    private TableDescriptor createTable(TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        LOGGER.debug("Attempting to create table '{}'.", tableId.toFullIdentiferString());
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be created because schema evolution is disabled.", tableId.toFullIdentiferString());
            throw new SQLException("Cannot create table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String createTableStatement = this.dialect.getCreateTableStatement(sinkRecordDescriptor, tableId);
            LOGGER.trace("SQL: {}", createTableStatement);
            this.session.createNativeQuery(createTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(tableId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private TableDescriptor alterTableIfNeeded(TableId tableId, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        LOGGER.debug("Attempting to alter table '{}'.", tableId.toFullIdentiferString());
        if (!hasTable(tableId)) {
            LOGGER.error("Table '{}' does not exist and cannot be altered.", tableId.toFullIdentiferString());
            throw new SQLException("Could not find table: " + tableId.toFullIdentiferString());
        }
        TableDescriptor readTable = readTable(tableId);
        Set<String> resolveMissingFields = this.dialect.resolveMissingFields(sinkRecordDescriptor, readTable);
        if (resolveMissingFields.isEmpty()) {
            return readTable;
        }
        LOGGER.debug("The follow fields are missing in the table: {}", resolveMissingFields);
        Iterator<String> it = resolveMissingFields.iterator();
        while (it.hasNext()) {
            SinkRecordDescriptor.FieldDescriptor fieldDescriptor = sinkRecordDescriptor.getFields().get(it.next());
            if (!fieldDescriptor.getSchema().isOptional() && fieldDescriptor.getSchema().defaultValue() == null) {
                throw new SQLException(String.format("Cannot ALTER table '%s' because field '%s' is not optional but has no default value", tableId.toFullIdentiferString(), fieldDescriptor.getName()));
            }
        }
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be altered because schema evolution is disabled.", tableId.toFullIdentiferString());
            throw new SQLException("Cannot alter table " + tableId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String alterTableStatement = this.dialect.getAlterTableStatement(readTable, sinkRecordDescriptor, resolveMissingFields);
            LOGGER.trace("SQL: {}", alterTableStatement);
            this.session.createNativeQuery(alterTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(tableId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private void write(TableDescriptor tableDescriptor, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        if (sinkRecordDescriptor.isDelete()) {
            writeDelete(this.dialect.getDeleteStatement(tableDescriptor, sinkRecordDescriptor), sinkRecordDescriptor);
            return;
        }
        switch (this.config.getInsertMode()) {
            case INSERT:
                writeInsert(this.dialect.getInsertStatement(tableDescriptor, sinkRecordDescriptor), sinkRecordDescriptor);
                return;
            case UPSERT:
                if (sinkRecordDescriptor.getKeyFieldNames().isEmpty()) {
                    throw new ConnectException("Cannot write to table " + tableDescriptor.getId().getTableName() + " with no key fields defined.");
                }
                writeUpsert(this.dialect.getUpsertStatement(tableDescriptor, sinkRecordDescriptor), sinkRecordDescriptor);
                return;
            case UPDATE:
                writeUpdate(this.dialect.getUpdateStatement(tableDescriptor, sinkRecordDescriptor), sinkRecordDescriptor);
                return;
            default:
                return;
        }
    }

    private void writeInsert(String str, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            NativeQuery<?> createNativeQuery = this.session.createNativeQuery(str, Object.class);
            bindNonKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, bindKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, 1));
            if (createNativeQuery.executeUpdate() != 1) {
                throw new SQLException("Failed to insert row from table");
            }
            beginTransaction.commit();
        } catch (SQLException e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private void writeUpsert(String str, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            NativeQuery<?> createNativeQuery = this.session.createNativeQuery(str, Object.class);
            bindNonKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, bindKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, 1));
            createNativeQuery.executeUpdate();
            beginTransaction.commit();
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private void writeUpdate(String str, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            NativeQuery<?> createNativeQuery = this.session.createNativeQuery(str, Object.class);
            bindKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, bindNonKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, 1));
            createNativeQuery.executeUpdate();
            beginTransaction.commit();
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private void writeDelete(String str, SinkRecordDescriptor sinkRecordDescriptor) throws SQLException {
        if (!this.config.isDeleteEnabled()) {
            LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
            return;
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            NativeQuery<?> createNativeQuery = this.session.createNativeQuery(str, Object.class);
            bindKeyValuesToQuery(sinkRecordDescriptor, createNativeQuery, 1);
            createNativeQuery.executeUpdate();
            beginTransaction.commit();
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private int bindKeyValuesToQuery(SinkRecordDescriptor sinkRecordDescriptor, NativeQuery<?> nativeQuery, int i) {
        switch (this.config.getPrimaryKeyMode()) {
            case KAFKA:
                int i2 = i + 1;
                nativeQuery.setParameter(i, sinkRecordDescriptor.getTopicName());
                int i3 = i2 + 1;
                nativeQuery.setParameter(i2, sinkRecordDescriptor.getPartition());
                i = i3 + 1;
                nativeQuery.setParameter(i3, Long.valueOf(sinkRecordDescriptor.getOffset()));
                break;
            case RECORD_KEY:
            case RECORD_VALUE:
                Struct keyStruct = sinkRecordDescriptor.getKeyStruct(this.config.getPrimaryKeyMode());
                if (keyStruct != null) {
                    i = bindFieldValuesToQuery(sinkRecordDescriptor, nativeQuery, i, keyStruct, sinkRecordDescriptor.getKeyFieldNames());
                    break;
                }
                break;
        }
        return i;
    }

    private int bindNonKeyValuesToQuery(SinkRecordDescriptor sinkRecordDescriptor, NativeQuery<?> nativeQuery, int i) {
        return bindFieldValuesToQuery(sinkRecordDescriptor, nativeQuery, i, sinkRecordDescriptor.getAfterStruct(), sinkRecordDescriptor.getNonKeyFieldNames());
    }

    private int bindFieldValuesToQuery(SinkRecordDescriptor sinkRecordDescriptor, NativeQuery<?> nativeQuery, int i, Struct struct, List<String> list) {
        for (String str : list) {
            i += this.dialect.bindValue(sinkRecordDescriptor.getFields().get(str), nativeQuery, i, struct.get(str));
        }
        return i;
    }
}
