package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/ConsumerOffsetCommitter.class */
public class ConsumerOffsetCommitter<K, V> extends AbstractOffsetCommitter<K, V> implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetCommitter.class);
    private final ParallelConsumerOptions.CommitMode commitMode;
    private final ReentrantLock commitLock;
    private Condition commitPerformed;
    private final AtomicLong commitCount;
    private final AtomicBoolean commitRequested;
    private Optional<Thread> owningThread;
    private boolean direct;

    public ConsumerOffsetCommitter(ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, ParallelConsumerOptions parallelConsumerOptions) {
        super(consumerManager, workManager);
        this.commitLock = new ReentrantLock(true);
        this.commitPerformed = this.commitLock.newCondition();
        this.commitCount = new AtomicLong(0L);
        this.commitRequested = new AtomicBoolean(false);
        this.owningThread = Optional.empty();
        this.direct = false;
        this.commitMode = parallelConsumerOptions.getCommitMode();
        if (this.commitMode.equals(ParallelConsumerOptions.CommitMode.TRANSACTIONAL_PRODUCER)) {
            throw new IllegalArgumentException("Cannot use " + this.commitMode + " when using " + getClass().getSimpleName());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commit() {
        if (isOwner()) {
            this.direct = true;
            retrieveOffsetsAndCommit();
        } else if (!isSync()) {
            log.debug("Async commit to be requested");
            requestCommitInternal();
        } else {
            log.debug("Sync commit");
            commitAndWaitForCondition();
            log.debug("Finished waiting");
        }
    }

    @Override // io.confluent.parallelconsumer.AbstractOffsetCommitter
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        if (map.isEmpty()) {
            log.trace("Nothing to commit");
            return;
        }
        switch (this.commitMode) {
            case CONSUMER_SYNC:
                log.debug("Committing offsets Sync");
                this.consumerMgr.commitSync(map);
                return;
            case CONSUMER_ASYNCHRONOUS:
                log.debug("Committing offsets Async");
                this.consumerMgr.commitAsync(map, (map2, exc) -> {
                    if (exc != null) {
                        log.error("Error committing offsets", exc);
                    }
                });
                return;
            default:
                throw new IllegalArgumentException("Cannot use " + this.commitMode + " when using " + getClass().getSimpleName());
        }
    }

    @Override // io.confluent.parallelconsumer.AbstractOffsetCommitter
    protected void postCommit() {
        if (this.direct || !this.commitMode.equals(ParallelConsumerOptions.CommitMode.CONSUMER_SYNC)) {
            return;
        }
        signalCommitPerformed();
    }

    private boolean isOwner() {
        return Thread.currentThread().equals(this.owningThread.orElse(null));
    }

    private void signalCommitPerformed() {
        log.debug("Starting Signaling commit finished");
        if (!this.commitLock.isHeldByCurrentThread()) {
            throw new IllegalStateException("Lock already held");
        }
        this.commitLock.lock();
        try {
            this.commitCount.incrementAndGet();
            log.debug("Signaling commit finished");
            this.commitPerformed.signalAll();
            log.debug("Finished Signaling commit finished");
        } finally {
            this.commitLock.unlock();
        }
    }

    private void commitAndWaitForCondition() {
        this.commitLock.lock();
        try {
            this.commitPerformed = this.commitLock.newCondition();
            long j = this.commitCount.get();
            requestCommitInternal();
            this.consumerMgr.wakeup();
            while (j == this.commitCount.get()) {
                if (j == this.commitCount.get()) {
                    log.debug("Requesting commit again");
                    requestCommitInternal();
                } else {
                    this.commitRequested.set(false);
                }
                try {
                    log.debug("Waiting on commit");
                    this.commitPerformed.await();
                } catch (InterruptedException e) {
                    log.debug("Interrupted waiting for commit condition", e);
                }
            }
            log.debug("Signaled");
            this.commitLock.unlock();
        } catch (Throwable th) {
            this.commitLock.unlock();
            throw th;
        }
    }

    private void requestCommitInternal() {
        this.commitLock.lock();
        try {
            this.commitRequested.set(true);
            this.consumerMgr.wakeup();
        } finally {
            this.commitLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeDoCommit() {
        this.commitLock.lock();
        try {
            if (this.commitRequested.get()) {
                retrieveOffsetsAndCommit();
            }
        } finally {
            this.commitLock.unlock();
        }
    }

    public boolean isSync() {
        return this.commitMode.equals(ParallelConsumerOptions.CommitMode.CONSUMER_SYNC);
    }

    public void claim() {
        this.owningThread = Optional.of(Thread.currentThread());
    }
}
