package org.apache.kafka.connect.runtime;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.class */
public class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ExactlyOnceWorkerSourceTask.class);
    private boolean transactionOpen;
    private final LinkedHashMap<SourceRecord, RecordMetadata> committableRecords;
    private final TransactionBoundaryManager transactionBoundaryManager;
    private final TransactionMetricsGroup transactionMetrics;
    private final Runnable preProducerCheck;
    private final Runnable postProducerCheck;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask$TransactionBoundaryManager.class */
    private abstract class TransactionBoundaryManager {
        private TransactionBoundaryManager() {
        }

        protected boolean shouldCommitTransactionForRecord(SourceRecord sourceRecord) {
            return false;
        }

        protected boolean shouldCommitTransactionForBatch(long j) {
            return false;
        }

        protected boolean shouldCommitFinalTransaction() {
            return false;
        }

        protected void initialize() {
        }

        public void maybeCommitTransactionForRecord(SourceRecord sourceRecord) {
            maybeCommitTransaction(shouldCommitTransactionForRecord(sourceRecord));
        }

        public void maybeCommitTransactionForBatch() {
            maybeCommitTransaction(shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.this.time.milliseconds()));
        }

        public void maybeCommitFinalTransaction() {
            maybeCommitTransaction(shouldCommitFinalTransaction());
        }

        private void maybeCommitTransaction(boolean z) {
            if (z) {
                LoggingContext forOffsets = LoggingContext.forOffsets(ExactlyOnceWorkerSourceTask.this.id);
                Throwable th = null;
                try {
                    ExactlyOnceWorkerSourceTask.this.commitTransaction();
                    if (forOffsets != null) {
                        if (0 == 0) {
                            forOffsets.close();
                            return;
                        }
                        try {
                            forOffsets.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (forOffsets != null) {
                        if (0 != 0) {
                            try {
                                forOffsets.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            forOffsets.close();
                        }
                    }
                    throw th3;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask$TransactionMetricsGroup.class */
    public static class TransactionMetricsGroup implements AutoCloseable {
        private final Sensor transactionSize;
        private int size;
        private final ConnectMetrics.MetricGroup metricGroup;

        public TransactionMetricsGroup(ConnectorTaskId connectorTaskId, ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sourceTaskGroupName(), registry.connectorTagName(), connectorTaskId.connector(), registry.taskTagName(), Integer.toString(connectorTaskId.task()));
            this.transactionSize = this.metricGroup.sensor("transaction-size");
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeAvg), new Avg());
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeMin), new Min());
            this.transactionSize.add(this.metricGroup.metricName(registry.transactionSizeMax), new Max());
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.metricGroup.close();
        }

        void addRecord() {
            this.size++;
        }

        void abortTransaction() {
            this.size = 0;
        }

        void commitTransaction() {
            this.transactionSize.record(this.size);
            this.size = 0;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public ExactlyOnceWorkerSourceTask(ConnectorTaskId connectorTaskId, SourceTask sourceTask, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, TransformationChain<SourceRecord> transformationChain, Producer<byte[], byte[]> producer, TopicAdmin topicAdmin, Map<String, TopicCreationGroup> map, CloseableOffsetStorageReader closeableOffsetStorageReader, OffsetStorageWriter offsetStorageWriter, ConnectorOffsetBackingStore connectorOffsetBackingStore, WorkerConfig workerConfig, ClusterConfigState clusterConfigState, ConnectMetrics connectMetrics, ErrorHandlingMetrics errorHandlingMetrics, ClassLoader classLoader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, SourceConnectorConfig sourceConnectorConfig, Executor executor, Runnable runnable, Runnable runnable2, Supplier<List<ErrorReporter>> supplier) {
        super(connectorTaskId, sourceTask, listener, targetState, converter, converter2, headerConverter, transformationChain, new WorkerSourceTaskContext(closeableOffsetStorageReader, connectorTaskId, clusterConfigState, buildTransactionContext(sourceConnectorConfig)), producer, topicAdmin, map, closeableOffsetStorageReader, offsetStorageWriter, connectorOffsetBackingStore, workerConfig, connectMetrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator, statusBackingStore, executor, supplier);
        this.transactionOpen = false;
        this.committableRecords = new LinkedHashMap<>();
        this.preProducerCheck = runnable;
        this.postProducerCheck = runnable2;
        this.transactionBoundaryManager = buildTransactionManager(workerConfig, sourceConnectorConfig, this.sourceTaskContext.transactionContext());
        this.transactionMetrics = new TransactionMetricsGroup(connectorTaskId, connectMetrics);
    }

    private static WorkerTransactionContext buildTransactionContext(SourceConnectorConfig sourceConnectorConfig) {
        if (SourceTask.TransactionBoundary.CONNECTOR.equals(sourceConnectorConfig.transactionBoundary())) {
            return new WorkerTransactionContext();
        }
        return null;
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToInitializeTask() {
        this.preProducerCheck.run();
        if (isStopping()) {
            return;
        }
        this.producer.initTransactions();
        this.postProducerCheck.run();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToEnterSendLoop() {
        this.transactionBoundaryManager.initialize();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void beginSendIteration() {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void prepareToPollTask() {
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordDropped(SourceRecord sourceRecord) {
        synchronized (this.committableRecords) {
            this.committableRecords.put(sourceRecord, null);
        }
        this.transactionBoundaryManager.maybeCommitTransactionForRecord(sourceRecord);
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected Optional<SubmittedRecords.SubmittedRecord> prepareToSendRecord(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord) {
        if (this.offsetStore.primaryOffsetsTopic().equals(producerRecord.topic())) {
            throw new ConnectException("Source tasks may not produce to their own offsets topics when exactly-once support is enabled");
        }
        maybeBeginTransaction();
        return Optional.empty();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordDispatched(SourceRecord sourceRecord) {
        this.offsetWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
        this.transactionMetrics.addRecord();
        this.transactionBoundaryManager.maybeCommitTransactionForRecord(sourceRecord);
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void batchDispatched() {
        this.transactionBoundaryManager.maybeCommitTransactionForBatch();
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void recordSent(SourceRecord sourceRecord, ProducerRecord<byte[], byte[]> producerRecord, RecordMetadata recordMetadata) {
        synchronized (this.committableRecords) {
            this.committableRecords.put(sourceRecord, recordMetadata);
        }
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void producerSendFailed(boolean z, ProducerRecord<byte[], byte[]> producerRecord, SourceRecord sourceRecord, Exception exc) {
        if (z) {
            throw maybeWrapProducerSendException("Unrecoverable exception trying to send", exc);
        }
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask
    protected void finalOffsetCommit(boolean z) {
        if (z) {
            log.debug("Skipping final offset commit as task has failed");
        } else {
            this.transactionBoundaryManager.maybeCommitFinalTransaction();
        }
    }

    @Override // org.apache.kafka.connect.runtime.AbstractWorkerSourceTask, org.apache.kafka.connect.runtime.WorkerTask
    public void removeMetrics() {
        Utils.closeQuietly(this.transactionMetrics, "source task transaction metrics tracker");
        super.removeMetrics();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void onPause() {
        super.onPause();
        this.transactionBoundaryManager.maybeCommitFinalTransaction();
    }

    private void maybeBeginTransaction() {
        if (this.transactionOpen) {
            return;
        }
        this.producer.beginTransaction();
        this.transactionOpen = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitTransaction() {
        log.debug("{} Committing offsets", this);
        long milliseconds = this.time.milliseconds();
        AtomicReference atomicReference = new AtomicReference();
        boolean z = false;
        try {
            z = this.offsetWriter.beginFlush();
        } catch (Throwable th) {
            atomicReference.compareAndSet(null, th);
        }
        if (atomicReference.get() == null && !this.transactionOpen && !z) {
            long milliseconds2 = this.time.milliseconds() - milliseconds;
            recordCommitSuccess(milliseconds2);
            log.debug("{} Finished commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds2));
            commitSourceTask();
            return;
        }
        maybeBeginTransaction();
        if (z) {
            this.offsetWriter.doFlush((th2, r8) -> {
                if (th2 == null) {
                    log.trace("{} Finished flushing offsets to storage", this);
                } else {
                    log.error("{} Failed to flush offsets to storage: ", this, th2);
                    atomicReference.compareAndSet(null, th2);
                }
            });
        }
        if (((Throwable) atomicReference.get()) == null) {
            try {
                this.producer.commitTransaction();
            } catch (Throwable th3) {
                log.error("{} Failed to commit producer transaction", this, th3);
                atomicReference.compareAndSet(null, th3);
            }
            this.transactionOpen = false;
        }
        Throwable th4 = (Throwable) atomicReference.get();
        if (th4 != null) {
            recordCommitFailure(this.time.milliseconds() - milliseconds, null);
            this.offsetWriter.cancelFlush();
            throw maybeWrapProducerSendException("Failed to flush offsets and/or records for task " + this.id, th4);
        }
        this.transactionMetrics.commitTransaction();
        long milliseconds3 = this.time.milliseconds() - milliseconds;
        recordCommitSuccess(milliseconds3);
        log.debug("{} Finished commitOffsets successfully in {} ms", this, Long.valueOf(milliseconds3));
        synchronized (this.committableRecords) {
            this.committableRecords.forEach(this::commitTaskRecord);
            this.committableRecords.clear();
        }
        commitSourceTask();
    }

    private RuntimeException maybeWrapProducerSendException(String str, Throwable th) {
        return isPossibleTransactionTimeoutError(th) ? wrapTransactionTimeoutError(th) : new ConnectException(str, th);
    }

    private static boolean isPossibleTransactionTimeoutError(Throwable th) {
        return (th instanceof InvalidProducerEpochException) || (th.getCause() instanceof InvalidProducerEpochException);
    }

    private ConnectException wrapTransactionTimeoutError(Throwable th) {
        return new ConnectException("The task " + this.id + " was unable to finish writing records to Kafka before its producer transaction expired. It may be necessary to reconfigure this connector in order for it to run healthily with exactly-once support. Options for this include: tune the connector's producer configuration for higher throughput, increase the transaction timeout for the connector's producers, decrease the offset commit interval (if using interval-based transaction boundaries), or use the 'poll' transaction boundary (if the connector is not already configured to use it).", th);
    }

    public String toString() {
        return "ExactlyOnceWorkerSourceTask{id=" + this.id + '}';
    }

    private TransactionBoundaryManager buildTransactionManager(WorkerConfig workerConfig, SourceConnectorConfig sourceConnectorConfig, final WorkerTransactionContext workerTransactionContext) {
        SourceTask.TransactionBoundary transactionBoundary = sourceConnectorConfig.transactionBoundary();
        switch (transactionBoundary) {
            case POLL:
                return new TransactionBoundaryManager() { // from class: org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.1
                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitTransactionForBatch(long j) {
                        return true;
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitFinalTransaction() {
                        return true;
                    }
                };
            case INTERVAL:
                final long longValue = ((Long) Optional.ofNullable(sourceConnectorConfig.transactionBoundaryInterval()).orElse(Long.valueOf(workerConfig.offsetCommitInterval()))).longValue();
                return new TransactionBoundaryManager() { // from class: org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.2
                    private final long commitInterval;
                    private long lastCommit;

                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                        this.commitInterval = longValue;
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    public void initialize() {
                        this.lastCommit = ExactlyOnceWorkerSourceTask.this.time.milliseconds();
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitTransactionForBatch(long j) {
                        if (ExactlyOnceWorkerSourceTask.this.time.milliseconds() < this.lastCommit + this.commitInterval) {
                            return false;
                        }
                        this.lastCommit = ExactlyOnceWorkerSourceTask.this.time.milliseconds();
                        return true;
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitFinalTransaction() {
                        return true;
                    }
                };
            case CONNECTOR:
                Objects.requireNonNull(workerTransactionContext, "Transaction context must be provided when using connector-defined transaction boundaries");
                return new TransactionBoundaryManager() { // from class: org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.3
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitFinalTransaction() {
                        return shouldCommitTransactionForBatch(ExactlyOnceWorkerSourceTask.this.time.milliseconds());
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitTransactionForBatch(long j) {
                        if (!workerTransactionContext.shouldAbortBatch()) {
                            return workerTransactionContext.shouldCommitBatch();
                        }
                        ExactlyOnceWorkerSourceTask.log.info("Aborting transaction for batch as requested by connector");
                        maybeAbortTransaction();
                        return true;
                    }

                    @Override // org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask.TransactionBoundaryManager
                    protected boolean shouldCommitTransactionForRecord(SourceRecord sourceRecord) {
                        if (!workerTransactionContext.shouldAbortOn(sourceRecord)) {
                            return workerTransactionContext.shouldCommitOn(sourceRecord);
                        }
                        ExactlyOnceWorkerSourceTask.log.info("Aborting transaction for record on topic {} as requested by connector", sourceRecord.topic());
                        ExactlyOnceWorkerSourceTask.log.trace("Last record in aborted transaction: {}", sourceRecord);
                        maybeAbortTransaction();
                        return true;
                    }

                    private void maybeAbortTransaction() {
                        if (!ExactlyOnceWorkerSourceTask.this.transactionOpen) {
                            ExactlyOnceWorkerSourceTask.log.warn("Ignoring request by task to abort transaction as the current transaction is empty");
                            return;
                        }
                        ExactlyOnceWorkerSourceTask.this.producer.abortTransaction();
                        ExactlyOnceWorkerSourceTask.this.transactionMetrics.abortTransaction();
                        ExactlyOnceWorkerSourceTask.this.transactionOpen = false;
                    }
                };
            default:
                throw new IllegalArgumentException("Unrecognized transaction boundary: " + transactionBoundary);
        }
    }

    TransactionMetricsGroup transactionMetricsGroup() {
        return this.transactionMetrics;
    }
}
