package io.debezium.connector.jdbc;

import io.debezium.DebeziumException;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.hibernate.SessionFactory;
import org.hibernate.StatelessSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/JdbcSinkConnectorTask.class */
public class JdbcSinkConnectorTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
    private static final Class[] EMPTY_CLASS_ARRAY = new Class[0];
    private SessionFactory sessionFactory;
    private JdbcChangeEventSink changeEventSink;
    private Throwable previousPutException;
    private boolean usePre380OriginalRecordAccess;
    private Method pre380OriginalRecordMethod;
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap();

    /* loaded from: input_file:io/debezium/connector/jdbc/JdbcSinkConnectorTask$State.class */
    private enum State {
        RUNNING,
        STOPPED
    }

    public JdbcSinkConnectorTask() {
        this.usePre380OriginalRecordAccess = false;
        this.pre380OriginalRecordMethod = null;
        try {
            this.pre380OriginalRecordMethod = InternalSinkRecord.class.getMethod("originalRecord", EMPTY_CLASS_ARRAY);
            this.usePre380OriginalRecordAccess = true;
            LOGGER.info("Old InternalSinkRecord class found, will use reflection for calls");
        } catch (NoSuchMethodException | SecurityException e) {
            LOGGER.info("New InternalSinkRecord class found");
        }
    }

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> map) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                this.stateLock.unlock();
                return;
            }
            this.previousPutException = null;
            JdbcSinkConnectorConfig jdbcSinkConnectorConfig = new JdbcSinkConnectorConfig(map);
            jdbcSinkConnectorConfig.validate();
            this.sessionFactory = jdbcSinkConnectorConfig.getHibernateConfiguration().buildSessionFactory();
            StatelessSession openStatelessSession = this.sessionFactory.openStatelessSession();
            DatabaseDialect resolve = DatabaseDialectResolver.resolve(jdbcSinkConnectorConfig, this.sessionFactory);
            this.changeEventSink = new JdbcChangeEventSink(jdbcSinkConnectorConfig, openStatelessSession, resolve, new RecordWriter(openStatelessSession, new QueryBinderResolver(), jdbcSinkConnectorConfig, resolve));
            this.stateLock.unlock();
        } catch (Throwable th) {
            this.stateLock.unlock();
            throw th;
        }
    }

    public void put(Collection<SinkRecord> collection) {
        Stopwatch reusable = Stopwatch.reusable();
        Stopwatch reusable2 = Stopwatch.reusable();
        Stopwatch reusable3 = Stopwatch.reusable();
        reusable.start();
        if (this.previousPutException != null) {
            LOGGER.error("JDBC sink connector failure", this.previousPutException);
            throw new ConnectException("JDBC sink connector failure", this.previousPutException);
        }
        LOGGER.debug("Received {} changes.", Integer.valueOf(collection.size()));
        try {
            reusable2.start();
            this.changeEventSink.execute(collection);
            reusable2.stop();
            reusable3.start();
            collection.forEach(this::markProcessed);
            reusable3.stop();
        } catch (Throwable th) {
            LOGGER.error("Failed to process record: {}", th.getMessage(), th);
            this.previousPutException = th;
            collection.forEach(this::markNotProcessed);
        }
        reusable.stop();
        LOGGER.trace("[PERF] Total put execution time {}", reusable.durations());
        LOGGER.trace("[PERF] Sink execute execution time {}", reusable2.durations());
        LOGGER.trace("[PERF] Mark processed execution time {}", reusable3.durations());
    }

    public void open(Collection<TopicPartition> collection) {
        if (LOGGER.isTraceEnabled()) {
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                LOGGER.trace("Requested open TopicPartition request for '{}'", it.next());
            }
        }
    }

    public void close(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            if (this.offsets.containsKey(topicPartition)) {
                LOGGER.trace("Requested close TopicPartition request for '{}'", topicPartition);
                this.offsets.remove(topicPartition);
            }
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flushing offsets: {}", this.offsets);
        flush(this.offsets);
        return this.offsets;
    }

    public void stop() {
        this.stateLock.lock();
        try {
            if (this.changeEventSink != null) {
                try {
                    this.changeEventSink.close();
                    if (this.sessionFactory == null || !this.sessionFactory.isOpen()) {
                        LOGGER.info("Session factory already closed");
                    } else {
                        LOGGER.info("Closing the session factory");
                        this.sessionFactory.close();
                    }
                } catch (Exception e) {
                    LOGGER.error("Failed to gracefully close resources.", e);
                }
            }
        } finally {
            if (this.previousPutException != null) {
                this.previousPutException = null;
            }
            if (this.changeEventSink != null) {
                this.changeEventSink = null;
            }
            this.stateLock.unlock();
        }
    }

    private void markProcessed(SinkRecord sinkRecord) {
        String originalTopicName = getOriginalTopicName(sinkRecord);
        if (Strings.isNullOrBlank(originalTopicName)) {
            return;
        }
        LOGGER.trace("Marking processed record for topic {}", originalTopicName);
        long originalKafkaOffset = getOriginalKafkaOffset(sinkRecord);
        OffsetAndMetadata put = this.offsets.put(new TopicPartition(originalTopicName, getOriginalKafkaPartition(sinkRecord).intValue()), new OffsetAndMetadata(originalKafkaOffset + 1));
        if (put == null) {
            LOGGER.trace("Advanced topic {} to offset {}.", originalTopicName, Long.valueOf(originalKafkaOffset));
        } else {
            LOGGER.trace("Updated topic {} from offset {} to {}.", new Object[]{originalTopicName, Long.valueOf(put.offset()), Long.valueOf(originalKafkaOffset)});
        }
    }

    private void markNotProcessed(SinkRecord sinkRecord) {
        String originalTopicName = getOriginalTopicName(sinkRecord);
        Integer originalKafkaPartition = getOriginalKafkaPartition(sinkRecord);
        long originalKafkaOffset = getOriginalKafkaOffset(sinkRecord);
        TopicPartition topicPartition = new TopicPartition(originalTopicName, originalKafkaPartition.intValue());
        if (this.offsets.containsKey(topicPartition)) {
            return;
        }
        LOGGER.debug("Rewinding topic {} offset to {}.", originalTopicName, Long.valueOf(originalKafkaOffset));
        this.offsets.put(topicPartition, new OffsetAndMetadata(originalKafkaOffset));
    }

    private String getOriginalTopicName(SinkRecord sinkRecord) {
        if (!(sinkRecord instanceof InternalSinkRecord)) {
            return null;
        }
        if (!this.usePre380OriginalRecordAccess) {
            return ((ConsumerRecord) ((InternalSinkRecord) sinkRecord).context().original()).topic();
        }
        try {
            return ((ConsumerRecord) this.pre380OriginalRecordMethod.invoke(sinkRecord, EMPTY_CLASS_ARRAY)).topic();
        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
            throw new DebeziumException("Failed to access original record data", e);
        }
    }

    private Integer getOriginalKafkaPartition(SinkRecord sinkRecord) {
        try {
            return sinkRecord.originalKafkaPartition();
        } catch (NoSuchMethodError e) {
            return sinkRecord.kafkaPartition();
        }
    }

    private long getOriginalKafkaOffset(SinkRecord sinkRecord) {
        try {
            return sinkRecord.originalKafkaOffset();
        } catch (NoSuchMethodError e) {
            return sinkRecord.kafkaOffset();
        }
    }
}
