/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.eventuate.kafka.consumer;

import com.networknt.eventuate.kafka.consumer.OffsetTracker;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageProcessor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private String subscriberId;
    private BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler;
    private OffsetTracker offsetTracker = new OffsetTracker();
    private BlockingQueue<ConsumerRecord<String, String>> processedRecords = new LinkedBlockingQueue<ConsumerRecord<String, String>>();

    public KafkaMessageProcessor(String subscriberId, BiConsumer<ConsumerRecord<String, String>, BiConsumer<Void, Throwable>> handler) {
        this.subscriberId = subscriberId;
        this.handler = handler;
    }

    public void process(ConsumerRecord<String, String> record) {
        this.offsetTracker.noteUnprocessed(new TopicPartition(record.topic(), record.partition()), record.offset());
        this.handler.accept(record, (result, t) -> {
            if (t != null) {
                this.logger.error("Got exception: ", (Throwable)t);
            } else {
                this.logger.debug("Adding processed record to queue {} {}", (Object)this.subscriberId, (Object)record.offset());
                this.processedRecords.add(record);
            }
        });
    }

    public Map<TopicPartition, OffsetAndMetadata> offsetsToCommit() {
        ConsumerRecord record;
        int count = 0;
        while ((record = (ConsumerRecord)this.processedRecords.poll()) != null) {
            ++count;
            this.offsetTracker.noteProcessed(new TopicPartition(record.topic(), record.partition()), record.offset());
        }
        this.logger.trace("removed {} {} processed records from queue", (Object)this.subscriberId, (Object)count);
        return this.offsetTracker.offsetsToCommit();
    }

    public void noteOffsetsCommitted(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
        this.offsetTracker.noteOffsetsCommitted(offsetsToCommit);
    }

    public OffsetTracker getPending() {
        return this.offsetTracker;
    }
}

