package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.class */
public class ConsumerOffsetCommitter<K, V> extends AbstractOffsetCommitter<K, V> implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetCommitter.class);
    private static final int ARBITRARY_RETRY_LIMIT = 50;
    private final ParallelConsumerOptions.CommitMode commitMode;
    private Optional<Thread> owningThread;
    private final Queue<CommitRequest> commitRequestQueue;
    private final BlockingQueue<CommitResponse> commitResponseQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter$CommitRequest.class */
    public static final class CommitRequest {
        private final UUID id = UUID.randomUUID();
        private final long requestedAtMs = System.currentTimeMillis();

        public UUID getId() {
            return this.id;
        }

        public long getRequestedAtMs() {
            return this.requestedAtMs;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CommitRequest)) {
                return false;
            }
            CommitRequest commitRequest = (CommitRequest) obj;
            if (getRequestedAtMs() != commitRequest.getRequestedAtMs()) {
                return false;
            }
            UUID id = getId();
            UUID id2 = commitRequest.getId();
            return id == null ? id2 == null : id.equals(id2);
        }

        public int hashCode() {
            long requestedAtMs = getRequestedAtMs();
            int i = (1 * 59) + ((int) ((requestedAtMs >>> 32) ^ requestedAtMs));
            UUID id = getId();
            return (i * 59) + (id == null ? 43 : id.hashCode());
        }

        public String toString() {
            return "ConsumerOffsetCommitter.CommitRequest(id=" + getId() + ", requestedAtMs=" + getRequestedAtMs() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter$CommitResponse.class */
    public static final class CommitResponse {
        private final CommitRequest request;

        public CommitResponse(CommitRequest commitRequest) {
            this.request = commitRequest;
        }

        public CommitRequest getRequest() {
            return this.request;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CommitResponse)) {
                return false;
            }
            CommitRequest request = getRequest();
            CommitRequest request2 = ((CommitResponse) obj).getRequest();
            return request == null ? request2 == null : request.equals(request2);
        }

        public int hashCode() {
            CommitRequest request = getRequest();
            return (1 * 59) + (request == null ? 43 : request.hashCode());
        }

        public String toString() {
            return "ConsumerOffsetCommitter.CommitResponse(request=" + getRequest() + ")";
        }
    }

    public ConsumerOffsetCommitter(ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, ParallelConsumerOptions parallelConsumerOptions) {
        super(consumerManager, workManager);
        this.owningThread = Optional.empty();
        this.commitRequestQueue = new ConcurrentLinkedQueue();
        this.commitResponseQueue = new LinkedBlockingQueue();
        this.commitMode = parallelConsumerOptions.getCommitMode();
        if (this.commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            throw new IllegalArgumentException("Cannot use " + this.commitMode + " when using " + getClass().getSimpleName());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commit() {
        if (isOwner()) {
            retrieveOffsetsAndCommit();
            return;
        }
        if (!isSync()) {
            log.debug("Async commit to be requested");
            requestCommitInternal();
        } else {
            log.debug("Sync commit");
            commitAndWait();
            log.debug("Finished waiting");
        }
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractOffsetCommitter
    protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        if (map.isEmpty()) {
            log.trace("Nothing to commit");
            return;
        }
        switch (this.commitMode) {
            case PERIODIC_CONSUMER_SYNC:
                log.debug("Committing offsets Sync");
                this.consumerMgr.commitSync(map);
                return;
            case PERIODIC_CONSUMER_ASYNCHRONOUS:
                log.debug("Committing offsets Async");
                this.consumerMgr.commitAsync(map, (map2, exc) -> {
                    if (exc != null) {
                        log.error("Error committing offsets", exc);
                    }
                });
                return;
            default:
                throw new IllegalArgumentException("Cannot use " + this.commitMode + " when using " + getClass().getSimpleName());
        }
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractOffsetCommitter
    protected void postCommit() {
    }

    private boolean isOwner() {
        return Thread.currentThread().equals(this.owningThread.orElse(null));
    }

    private void commitAndWait() {
        Duration duration;
        CommitResponse poll;
        CommitRequest requestCommitInternal = requestCommitInternal();
        boolean z = true;
        int i = 0;
        while (z) {
            if (i > ARBITRARY_RETRY_LIMIT) {
                throw new InternalRuntimeError("Too many attempts taking commit responses");
            }
            try {
                log.debug("Waiting on a commit response");
                duration = AbstractParallelEoSStreamProcessor.DEFAULT_TIMEOUT;
                poll = this.commitResponseQueue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.debug("Interrupted waiting for commit response", e);
            }
            if (poll == null) {
                throw new InternalRuntimeError(StringUtils.msg("Timeout waiting for commit response {} to request {}", duration, requestCommitInternal));
                break;
            } else {
                z = poll.getRequest().getId() != requestCommitInternal.getId();
                i++;
            }
        }
    }

    private CommitRequest requestCommitInternal() {
        CommitRequest commitRequest = new CommitRequest();
        this.commitRequestQueue.add(commitRequest);
        this.consumerMgr.wakeup();
        return commitRequest;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeDoCommit() {
        CommitRequest poll = this.commitRequestQueue.poll();
        if (poll != null) {
            log.debug("Commit requested, performing...");
            retrieveOffsetsAndCommit();
            if (isSync()) {
                log.debug("Adding commit response to queue...");
                this.commitResponseQueue.add(new CommitResponse(poll));
            }
        }
    }

    public boolean isSync() {
        return this.commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC);
    }

    public void claim() {
        this.owningThread = Optional.of(Thread.currentThread());
    }
}
