package io.confluent.parallelconsumer;

import io.confluent.csid.utils.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/ProducerManager.class */
public class ProducerManager<K, V> extends AbstractOffsetCommitter<K, V> implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
    protected final Producer<K, V> producer;
    private final ParallelConsumerOptions options;
    private final boolean producerIsConfiguredForTransactions;
    private ReentrantReadWriteLock producerTransactionLock;
    private Field txManagerField;
    private Method txManagerMethodIsCompleting;
    private Method txManagerMethodIsReady;

    public ProducerManager(Producer<K, V> producer, ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, ParallelConsumerOptions parallelConsumerOptions) {
        super(consumerManager, workManager);
        this.producer = producer;
        this.options = parallelConsumerOptions;
        this.producerIsConfiguredForTransactions = setupReflection();
        initProducer();
    }

    private void initProducer() {
        this.producerTransactionLock = new ReentrantReadWriteLock(true);
        if (!this.options.isUsingTransactionalProducer()) {
            if (this.producerIsConfiguredForTransactions) {
                throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - the Producer must not have a transaction ID for this option. This is because having such an ID forces the Producer into transactional mode - i.e. you cannot use it without using transactions.");
            }
        } else {
            if (!this.producerIsConfiguredForTransactions) {
                throw new IllegalArgumentException("Using transactional option, yet Producer doesn't have a transaction ID - Producer needs a transaction id");
            }
            try {
                log.debug("Initialising producer transaction session...");
                this.producer.initTransactions();
                this.producer.beginTransaction();
            } catch (KafkaException e) {
                log.error("Make sure your producer is setup for transactions - specifically make sure it's {} is set.", "transactional.id", e);
                throw e;
            }
        }
    }

    private boolean getProducerIsTransactional() {
        if (this.producer instanceof MockProducer) {
            return this.options.isUsingTransactionalProducer();
        }
        TransactionManager transactionManager = getTransactionManager();
        if (transactionManager == null) {
            return false;
        }
        return transactionManager.isTransactional();
    }

    private TransactionManager getTransactionManager() {
        if (this.txManagerField == null) {
            return null;
        }
        return (TransactionManager) this.txManagerField.get(this.producer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordMetadata produceMessage(ProducerRecord<K, V> producerRecord) {
        Callback callback = (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("Error producing result message", exc);
                throw new RuntimeException("Error producing result message", exc);
            }
        };
        ReentrantReadWriteLock.ReadLock readLock = this.producerTransactionLock.readLock();
        readLock.lock();
        try {
            Future send = this.producer.send(producerRecord, callback);
            readLock.unlock();
            try {
                return (RecordMetadata) send.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            readLock.unlock();
            throw th;
        }
    }

    @Override // io.confluent.parallelconsumer.AbstractOffsetCommitter
    protected void preAcquireWork() {
        acquireCommitLock();
    }

    @Override // io.confluent.parallelconsumer.AbstractOffsetCommitter
    protected void postCommit() {
        if (this.producerTransactionLock.getWriteHoldCount() > 1) {
            throw new ConcurrentModificationException("Lock held too many times, won't be released problem and will cause deadlock");
        }
        releaseCommitLock();
    }

    @Override // io.confluent.parallelconsumer.AbstractOffsetCommitter
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        log.debug("Transactional offset commit starting");
        if (!this.options.isUsingTransactionalProducer()) {
            throw new IllegalStateException("Bug: cannot use if not using transactional producer");
        }
        this.producer.sendOffsetsToTransaction(map, consumerGroupMetadata);
        boolean z = false;
        int i = 0;
        Exception exc = null;
        while (!z) {
            if (i > 200) {
                String msg = StringUtils.msg("Retired too many times ({} > limit of {}), giving up. See error above.", Integer.valueOf(i), Integer.valueOf(OffsetSimultaneousEncoder.LARGE_INPUT_MAP_SIZE_THRESHOLD));
                log.error(msg, exc);
                throw new RuntimeException(msg, exc);
            }
            try {
                if (this.producer instanceof MockProducer) {
                    synchronized (this.producer) {
                        this.producer.commitTransaction();
                        this.producer.beginTransaction();
                    }
                } else {
                    ensureLockHeld();
                    if (i > 0) {
                        if (isTransactionCompleting()) {
                            this.producer.commitTransaction();
                        }
                        if (isTransactionReady()) {
                            this.producer.beginTransaction();
                        }
                        if (exc != null ? !exc.getMessage().contains("Invalid transition attempted from state READY to state COMMITTING_TRANSACTION") : true) {
                            log.error("Was already ready - tx completed between interrupt and retry");
                        }
                    } else {
                        this.producer.commitTransaction();
                        this.producer.beginTransaction();
                    }
                }
                z = true;
                if (i > 0) {
                    log.warn("Commit success, but took {} tries.", Integer.valueOf(i));
                }
            } catch (Exception e) {
                log.warn("Commit exception, will retry, have tried {} times (see KafkaProducer#commit)", Integer.valueOf(i), e);
                exc = e;
                i++;
            }
        }
    }

    private boolean setupReflection() {
        if (!(this.producer instanceof KafkaProducer)) {
            if (this.producer instanceof MockProducer) {
                return this.options.isUsingTransactionalProducer();
            }
            return false;
        }
        this.txManagerField = this.producer.getClass().getDeclaredField("transactionManager");
        this.txManagerField.setAccessible(true);
        boolean producerIsTransactional = getProducerIsTransactional();
        if (producerIsTransactional) {
            TransactionManager transactionManager = getTransactionManager();
            this.txManagerMethodIsCompleting = transactionManager.getClass().getDeclaredMethod("isCompleting", new Class[0]);
            this.txManagerMethodIsCompleting.setAccessible(true);
            this.txManagerMethodIsReady = transactionManager.getClass().getDeclaredMethod("isReady", new Class[0]);
            this.txManagerMethodIsReady.setAccessible(true);
        }
        return producerIsTransactional;
    }

    private boolean isTransactionCompleting() {
        if (this.producer instanceof MockProducer) {
            return false;
        }
        return ((Boolean) this.txManagerMethodIsCompleting.invoke(getTransactionManager(), new Object[0])).booleanValue();
    }

    private boolean isTransactionReady() {
        if (this.producer instanceof MockProducer) {
            return true;
        }
        return ((Boolean) this.txManagerMethodIsReady.invoke(getTransactionManager(), new Object[0])).booleanValue();
    }

    public void close(Duration duration) {
        log.debug("Closing producer, assuming no more in flight...");
        if (this.options.isUsingTransactionalProducer() && !isTransactionReady()) {
            acquireCommitLock();
            try {
                this.producer.abortTransaction();
            } finally {
                releaseCommitLock();
            }
        }
        this.producer.close(duration);
    }

    private void acquireCommitLock() {
        if (this.producerTransactionLock.getWriteHoldCount() > 0) {
            throw new ConcurrentModificationException("Lock already held");
        }
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (this.producerTransactionLock.isWriteLocked() && !this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new ConcurrentModificationException(getClass().getSimpleName() + " is not safe for multi-threaded access");
        }
        writeLock.lock();
    }

    private void releaseCommitLock() {
        log.trace("Release commit lock");
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Not held be me");
        }
        writeLock.unlock();
    }

    private void ensureLockHeld() {
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Expected commit lock to be held");
        }
    }

    public boolean isTransactionInProgress() {
        return this.producerTransactionLock.isWriteLocked();
    }
}
