package com.networknt.kafka.consumer;

import com.networknt.exception.FrameworkException;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.status.Status;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/kafka/consumer/KafkaConsumerReadTask.class */
class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
    private static final String UNEXPECTED_CONSUMER_READ_EXCEPTION = "ERR12205";
    private final KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
    private final Duration requestTimeout;
    private final int responseMinBytes;
    private final long maxResponseBytes;
    private final ConsumerReadCallback<ClientKeyT, ClientValueT> callback;
    private boolean finished;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private final Instant started;
    private long bytesConsumed = 0;
    private boolean exceededMinResponseBytes = false;
    private boolean exceededMaxResponseBytes = false;
    private final Clock clock = Clock.systemUTC();

    public KafkaConsumerReadTask(KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> kafkaConsumerState, Duration duration, long j, ConsumerReadCallback<ClientKeyT, ClientValueT> consumerReadCallback, KafkaConsumerConfig kafkaConsumerConfig) {
        this.parent = kafkaConsumerState;
        this.maxResponseBytes = Math.min(j, kafkaConsumerConfig.getRequestMaxBytes());
        Duration ofMillis = kafkaConsumerState.getConsumerInstanceConfig().getRequestWaitMs() != null ? Duration.ofMillis(kafkaConsumerState.getConsumerInstanceConfig().getRequestWaitMs().intValue()) : Duration.ofMillis(kafkaConsumerConfig.getRequestTimeoutMs());
        this.requestTimeout = (duration.isNegative() || duration.isZero()) ? ofMillis : (Duration) Collections.min(Arrays.asList(duration, ofMillis));
        int intValue = kafkaConsumerState.getConsumerInstanceConfig().getResponseMinBytes() != null ? kafkaConsumerState.getConsumerInstanceConfig().getResponseMinBytes().intValue() : kafkaConsumerConfig.getFetchMinBytes();
        this.responseMinBytes = intValue < 0 ? Integer.MAX_VALUE : intValue;
        this.callback = consumerReadCallback;
        this.finished = false;
        this.started = this.clock.instant();
    }

    public void doPartialRead() {
        try {
            if (this.messages == null) {
                this.messages = new Vector();
            }
            addRecords();
            log.trace("KafkaConsumerReadTask exiting read with id={} messages={} bytes={}, backing off if not complete", new Object[]{this, Integer.valueOf(this.messages.size()), Long.valueOf(this.bytesConsumed)});
            boolean z = Duration.between(this.started, this.clock.instant()).compareTo(this.requestTimeout) >= 0;
            if (z || this.exceededMaxResponseBytes || this.exceededMinResponseBytes) {
                log.trace("Finishing KafkaConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={} exceededMinResponseBytes={}", new Object[]{this, Boolean.valueOf(z), Boolean.valueOf(this.exceededMaxResponseBytes), Boolean.valueOf(this.exceededMinResponseBytes)});
                finish();
            }
        } catch (Exception e) {
            finish(new FrameworkException(new Status(UNEXPECTED_CONSUMER_READ_EXCEPTION, new Object[]{this})));
            log.error("Unexpected exception in consumer read task", e);
        }
    }

    public boolean isDone() {
        return this.finished;
    }

    private void addRecords() {
        while (!this.exceededMinResponseBytes && !this.exceededMaxResponseBytes && this.parent.hasNext()) {
            synchronized (this.parent) {
                if (this.parent.hasNext()) {
                    maybeAddRecord();
                }
            }
        }
        while (!this.exceededMaxResponseBytes && this.parent.hasNextCached()) {
            synchronized (this.parent) {
                if (this.parent.hasNextCached()) {
                    maybeAddRecord();
                }
            }
        }
    }

    private void maybeAddRecord() {
        ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord = this.parent.createConsumerRecord(this.parent.peek());
        long size = createConsumerRecord.getSize();
        if (this.bytesConsumed + size >= this.maxResponseBytes) {
            this.exceededMaxResponseBytes = true;
            return;
        }
        this.messages.add(createConsumerRecord.getRecord());
        this.parent.next();
        this.bytesConsumed += size;
        if (this.exceededMinResponseBytes || this.bytesConsumed <= this.responseMinBytes) {
            return;
        }
        this.exceededMinResponseBytes = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finish() {
        finish(null);
    }

    private void finish(FrameworkException frameworkException) {
        log.trace("Finishing KafkaConsumerReadTask id={}", this, frameworkException);
        try {
            this.callback.onCompletion(frameworkException == null ? this.messages : null, frameworkException);
        } catch (Throwable th) {
            log.error("Consumer read callback threw an unhandled exception ", th);
        }
        this.finished = true;
    }
}
