package com.networknt.eventuate.kafka.consumer;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/kafka/consumer/KafkaMessageProcessor.class */
public class KafkaMessageProcessor {
    private String subscriberId;
    private BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private OffsetTracker offsetTracker = new OffsetTracker();
    private BlockingQueue<ConsumerRecord<String, String>> processedRecords = new LinkedBlockingQueue();

    public KafkaMessageProcessor(String str, BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> biConsumer) {
        this.subscriberId = str;
        this.handler = biConsumer;
    }

    public void process(ConsumerRecord<String, String> consumerRecord) {
        this.offsetTracker.noteUnprocessed(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
        this.handler.accept(consumerRecord, (r8, th) -> {
            if (th != null) {
                this.logger.error("Got exception: ", th);
            } else {
                this.logger.debug("Adding processed record to queue {} {}", this.subscriberId, Long.valueOf(consumerRecord.offset()));
                this.processedRecords.add(consumerRecord);
            }
        });
    }

    public Map<TopicPartition, OffsetAndMetadata> offsetsToCommit() {
        int i = 0;
        while (true) {
            ConsumerRecord<String, String> poll = this.processedRecords.poll();
            if (poll == null) {
                this.logger.trace("removed {} {} processed records from queue", this.subscriberId, Integer.valueOf(i));
                return this.offsetTracker.offsetsToCommit();
            }
            i++;
            this.offsetTracker.noteProcessed(new TopicPartition(poll.topic(), poll.partition()), poll.offset());
        }
    }

    public void noteOffsetsCommitted(Map<TopicPartition, OffsetAndMetadata> map) {
        this.offsetTracker.noteOffsetsCommitted(map);
    }

    public OffsetTracker getPending() {
        return this.offsetTracker;
    }
}
