/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc;

import io.debezium.DebeziumException;
import io.debezium.connector.jdbc.JdbcChangeEventSink;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.Module;
import io.debezium.connector.jdbc.QueryBinderResolver;
import io.debezium.connector.jdbc.RecordWriter;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.hibernate.SessionFactory;
import org.hibernate.SharedSessionContract;
import org.hibernate.StatelessSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSinkConnectorTask
extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
    private static final Class[] EMPTY_CLASS_ARRAY = new Class[0];
    private SessionFactory sessionFactory;
    private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private JdbcChangeEventSink changeEventSink;
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Throwable previousPutException;
    private boolean usePre380OriginalRecordAccess = false;
    private Method pre380OriginalRecordMethod = null;

    public JdbcSinkConnectorTask() {
        try {
            this.pre380OriginalRecordMethod = InternalSinkRecord.class.getMethod("originalRecord", EMPTY_CLASS_ARRAY);
            this.usePre380OriginalRecordAccess = true;
            LOGGER.info("Old InternalSinkRecord class found, will use reflection for calls");
        }
        catch (NoSuchMethodException | SecurityException e) {
            LOGGER.info("New InternalSinkRecord class found");
        }
    }

    public String version() {
        return Module.version();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Map<String, String> props) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.previousPutException = null;
            JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(props);
            config.validate();
            this.sessionFactory = config.getHibernateConfiguration().buildSessionFactory();
            StatelessSession session = this.sessionFactory.openStatelessSession();
            DatabaseDialect databaseDialect = DatabaseDialectResolver.resolve(config, this.sessionFactory);
            QueryBinderResolver queryBinderResolver = new QueryBinderResolver();
            RecordWriter recordWriter = new RecordWriter((SharedSessionContract)session, queryBinderResolver, config, databaseDialect);
            this.changeEventSink = new JdbcChangeEventSink(config, session, databaseDialect, recordWriter);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public void put(Collection<SinkRecord> records) {
        Stopwatch putStopWatch = Stopwatch.reusable();
        Stopwatch executeStopWatch = Stopwatch.reusable();
        Stopwatch markProcessedStopWatch = Stopwatch.reusable();
        putStopWatch.start();
        if (this.previousPutException != null) {
            LOGGER.error("JDBC sink connector failure", this.previousPutException);
            throw new ConnectException("JDBC sink connector failure", this.previousPutException);
        }
        LOGGER.debug("Received {} changes.", (Object)records.size());
        try {
            executeStopWatch.start();
            this.changeEventSink.execute(records);
            executeStopWatch.stop();
            markProcessedStopWatch.start();
            records.forEach(this::markProcessed);
            markProcessedStopWatch.stop();
        }
        catch (Throwable throwable) {
            LOGGER.error("Failed to process record: {}", (Object)throwable.getMessage(), (Object)throwable);
            this.previousPutException = throwable;
            records.forEach(this::markNotProcessed);
        }
        putStopWatch.stop();
        LOGGER.trace("[PERF] Total put execution time {}", (Object)putStopWatch.durations());
        LOGGER.trace("[PERF] Sink execute execution time {}", (Object)executeStopWatch.durations());
        LOGGER.trace("[PERF] Mark processed execution time {}", (Object)markProcessedStopWatch.durations());
    }

    public void open(Collection<TopicPartition> partitions) {
        if (LOGGER.isTraceEnabled()) {
            for (TopicPartition partition : partitions) {
                LOGGER.trace("Requested open TopicPartition request for '{}'", (Object)partition);
            }
        }
    }

    public void close(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            if (!this.offsets.containsKey(partition)) continue;
            LOGGER.trace("Requested close TopicPartition request for '{}'", (Object)partition);
            this.offsets.remove(partition);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        LOGGER.debug("Flushing offsets: {}", this.offsets);
        this.flush(this.offsets);
        return this.offsets;
    }

    public void stop() {
        block8: {
            this.stateLock.lock();
            try {
                if (this.changeEventSink == null) break block8;
                try {
                    this.changeEventSink.close();
                    if (this.sessionFactory != null && this.sessionFactory.isOpen()) {
                        LOGGER.info("Closing the session factory");
                        this.sessionFactory.close();
                        break block8;
                    }
                    LOGGER.info("Session factory already closed");
                }
                catch (Exception e) {
                    LOGGER.error("Failed to gracefully close resources.", (Throwable)e);
                }
            }
            finally {
                if (this.previousPutException != null) {
                    this.previousPutException = null;
                }
                if (this.changeEventSink != null) {
                    this.changeEventSink = null;
                }
                this.stateLock.unlock();
            }
        }
    }

    private void markProcessed(SinkRecord record) {
        String topicName = this.getOriginalTopicName(record);
        if (Strings.isNullOrBlank((String)topicName)) {
            return;
        }
        LOGGER.trace("Marking processed record for topic {}", (Object)topicName);
        long kafkaOffset = this.getOriginalKafkaOffset(record);
        TopicPartition topicPartition = new TopicPartition(topicName, this.getOriginalKafkaPartition(record).intValue());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaOffset + 1L);
        OffsetAndMetadata existing = this.offsets.put(topicPartition, offsetAndMetadata);
        if (existing == null) {
            LOGGER.trace("Advanced topic {} to offset {}.", (Object)topicName, (Object)kafkaOffset);
        } else {
            LOGGER.trace("Updated topic {} from offset {} to {}.", new Object[]{topicName, existing.offset(), kafkaOffset});
        }
    }

    private void markNotProcessed(SinkRecord record) {
        String topicName = this.getOriginalTopicName(record);
        Integer kafkaPartition = this.getOriginalKafkaPartition(record);
        long kafkaOffset = this.getOriginalKafkaOffset(record);
        TopicPartition topicPartition = new TopicPartition(topicName, kafkaPartition.intValue());
        if (!this.offsets.containsKey(topicPartition)) {
            LOGGER.debug("Rewinding topic {} offset to {}.", (Object)topicName, (Object)kafkaOffset);
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaOffset);
            this.offsets.put(topicPartition, offsetAndMetadata);
        }
    }

    private String getOriginalTopicName(SinkRecord record) {
        if (record instanceof InternalSinkRecord) {
            if (this.usePre380OriginalRecordAccess) {
                try {
                    return ((ConsumerRecord)this.pre380OriginalRecordMethod.invoke((Object)record, (Object[])EMPTY_CLASS_ARRAY)).topic();
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    throw new DebeziumException("Failed to access original record data", (Throwable)e);
                }
            }
            return ((ConsumerRecord)((InternalSinkRecord)record).context().original()).topic();
        }
        return null;
    }

    private Integer getOriginalKafkaPartition(SinkRecord record) {
        try {
            return record.originalKafkaPartition();
        }
        catch (NoSuchMethodError e) {
            return record.kafkaPartition();
        }
    }

    private long getOriginalKafkaOffset(SinkRecord record) {
        try {
            return record.originalKafkaOffset();
        }
        catch (NoSuchMethodError e) {
            return record.kafkaOffset();
        }
    }

    private static enum State {
        RUNNING,
        STOPPED;

    }
}

