package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.parallelconsumer.internal.CountingCRLinkedList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/WorkMailBoxManager.class */
public class WorkMailBoxManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(WorkMailBoxManager.class);
    private int sharedBoxNestedRecordCount;
    private final LinkedBlockingQueue<ConsumerRecords<K, V>> workInbox = new LinkedBlockingQueue<>();
    private final CountingCRLinkedList<K, V> internalBatchMailQueue = new CountingCRLinkedList<>();
    private final Queue<ConsumerRecord<K, V>> internalFlattenedMailQueue = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Integer getWorkQueuedInMailboxCount() {
        return Integer.valueOf(this.sharedBoxNestedRecordCount + this.internalBatchMailQueue.getNestedCount() + this.internalFlattenedMailQueue.size());
    }

    public void registerWork(ConsumerRecords<K, V> consumerRecords) {
        synchronized (this.workInbox) {
            this.sharedBoxNestedRecordCount += consumerRecords.count();
            this.workInbox.add(consumerRecords);
        }
    }

    private void drainSharedMailbox() {
        synchronized (this.workInbox) {
            this.workInbox.drainTo(this.internalBatchMailQueue);
            this.sharedBoxNestedRecordCount = 0;
        }
    }

    private synchronized void flattenBatchQueue() {
        drainSharedMailbox();
        while (!this.internalBatchMailQueue.isEmpty()) {
            ConsumerRecords<K, V> poll = this.internalBatchMailQueue.poll();
            log.debug("Flattening {} records", Integer.valueOf(poll.count()));
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                this.internalFlattenedMailQueue.add((ConsumerRecord) it.next());
            }
        }
    }

    public synchronized void onPartitionsRemoved(Collection<TopicPartition> collection) {
        log.debug("Removing stale work from inbox queues");
        flattenBatchQueue();
        this.internalFlattenedMailQueue.removeIf(consumerRecord -> {
            return collection.contains(KafkaUtils.toTP(consumerRecord));
        });
    }

    public synchronized boolean internalFlattenedMailQueueIsEmpty() {
        return this.internalFlattenedMailQueue.isEmpty();
    }

    public synchronized ConsumerRecord<K, V> internalFlattenedMailQueuePoll() {
        if (this.internalBatchMailQueue.isEmpty()) {
            flattenBatchQueue();
        }
        return this.internalFlattenedMailQueue.poll();
    }

    public int internalFlattenedMailQueueSize() {
        return this.internalFlattenedMailQueue.size();
    }
}
