package io.debezium.connector.jdbc;

import io.debezium.pipeline.sink.spi.ChangeEventSink;
import io.debezium.util.Strings;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/JdbcSinkConnectorTask.class */
public class JdbcSinkConnectorTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
    public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
    public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";
    private ChangeEventSink changeEventSink;
    private Throwable previousPutException;
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap();

    /* loaded from: input_file:io/debezium/connector/jdbc/JdbcSinkConnectorTask$State.class */
    private enum State {
        RUNNING,
        STOPPED
    }

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> map) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.previousPutException = null;
            JdbcSinkConnectorConfig jdbcSinkConnectorConfig = new JdbcSinkConnectorConfig(map);
            jdbcSinkConnectorConfig.validate();
            this.changeEventSink = new JdbcChangeEventSink(jdbcSinkConnectorConfig);
        } finally {
            this.stateLock.unlock();
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (this.previousPutException != null) {
            throw new ConnectException("JDBC sink connector failure", this.previousPutException);
        }
        LOGGER.debug("Received {} changes.", Integer.valueOf(collection.size()));
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            SinkRecord next = it.next();
            LOGGER.trace("Received {}", next);
            try {
                validate(next);
                this.changeEventSink.execute(next);
                markProcessed(next);
            } catch (Throwable th) {
                markNotProcessed(next);
                LOGGER.error("Failed to process record: {}", th.getMessage(), th);
                this.previousPutException = th;
                markNotProcessed(it);
            }
        }
    }

    private void validate(SinkRecord sinkRecord) {
        if (isSchemaChange(sinkRecord)) {
            LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
            throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
        }
    }

    private static boolean isSchemaChange(SinkRecord sinkRecord) {
        return (sinkRecord.valueSchema() == null || Strings.isNullOrEmpty(sinkRecord.valueSchema().name()) || !SCHEMA_CHANGE_VALUE.contains(sinkRecord.valueSchema().name())) ? false : true;
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.debug("Flushing offsets: {}", this.offsets);
        flush(this.offsets);
        return this.offsets;
    }

    public void stop() {
        if (this.changeEventSink != null) {
            try {
                this.changeEventSink.close();
            } catch (Exception e) {
                LOGGER.error("Failed to gracefully close resources.", e);
            }
        }
    }

    private void markProcessed(SinkRecord sinkRecord) {
        String originalTopicName = getOriginalTopicName(sinkRecord);
        if (Strings.isNullOrBlank(originalTopicName)) {
            return;
        }
        LOGGER.trace("Marking processed record for topic {}", originalTopicName);
        OffsetAndMetadata put = this.offsets.put(new TopicPartition(originalTopicName, sinkRecord.kafkaPartition().intValue()), new OffsetAndMetadata(sinkRecord.kafkaOffset() + 1));
        if (put == null) {
            LOGGER.trace("Advanced topic {} to offset {}.", originalTopicName, Long.valueOf(sinkRecord.kafkaOffset()));
        } else {
            LOGGER.trace("Updated topic {} from offset {} to {}.", new Object[]{originalTopicName, Long.valueOf(put.offset()), Long.valueOf(sinkRecord.kafkaOffset())});
        }
    }

    private void markNotProcessed(Iterator<SinkRecord> it) {
        while (it.hasNext()) {
            markNotProcessed(it.next());
        }
        this.context.requestCommit();
    }

    private void markNotProcessed(SinkRecord sinkRecord) {
        TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        if (this.offsets.containsKey(topicPartition)) {
            return;
        }
        LOGGER.debug("Rewinding topic {} offset to {}.", sinkRecord.topic(), Long.valueOf(sinkRecord.kafkaOffset()));
        this.offsets.put(topicPartition, new OffsetAndMetadata(sinkRecord.kafkaOffset()));
    }

    private String getOriginalTopicName(SinkRecord sinkRecord) {
        if (sinkRecord instanceof InternalSinkRecord) {
            return ((InternalSinkRecord) sinkRecord).originalRecord().topic();
        }
        return null;
    }
}
