package dev.vality.kafka.common.loader;

import dev.vality.kafka.common.exception.RetryableException;
import dev.vality.kafka.common.exception.SkippedException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:dev/vality/kafka/common/loader/PreloadListenerImpl.class */
public class PreloadListenerImpl<K, T> implements PreloadListener<K, T> {
    private static final Logger log = LoggerFactory.getLogger(PreloadListenerImpl.class);
    private ThreadLocal<Integer> countAttempt;
    private int maxAttempt;
    private ExponentialBackOff backOff;

    public PreloadListenerImpl(int i, ExponentialBackOff exponentialBackOff) {
        this.countAttempt = ThreadLocal.withInitial(() -> {
            return 0;
        });
        this.maxAttempt = 3;
        this.backOff = new ExponentialBackOff();
        this.maxAttempt = i;
        this.backOff = exponentialBackOff;
    }

    @Override // dev.vality.kafka.common.loader.PreloadListener
    public void preloadToLastOffsetInPartition(Consumer<K, T> consumer, String str, int i, java.util.function.Consumer<T> consumer2) {
        TopicPartition topicPartition = new TopicPartition(str, i);
        List singletonList = Collections.singletonList(topicPartition);
        consumer.assign(singletonList);
        consumer.seekToEnd(singletonList);
        long position = consumer.position(topicPartition);
        if (position <= 0) {
            return;
        }
        long j = position - 1;
        consumer.seekToBeginning(Collections.singletonList(topicPartition));
        long position2 = consumer.position(topicPartition);
        BackOffExecution start = this.backOff.start();
        while (position2 <= j) {
            ConsumerRecords poll = consumer.poll(Duration.ofSeconds(1L));
            try {
                if (!poll.isEmpty()) {
                    poll.forEach(consumerRecord -> {
                        safeHandle(consumer2, consumerRecord);
                    });
                }
                position2 = consumer.position(topicPartition);
                start = resetBackoff();
            } catch (RetryableException e) {
                log.error("PreloadListenerImpl preloadToLastOffsetInPartition RetryableException e: ", e);
                if (this.countAttempt.get().intValue() >= this.maxAttempt) {
                    throw new RuntimeException("PreloadListenerImpl cant retry!", e);
                }
                consumer.seek(topicPartition, position2);
                this.countAttempt.set(Integer.valueOf(this.countAttempt.get().intValue() + 1));
                waitBackoff(start);
            } catch (Exception e2) {
                log.error("PreloadListenerImpl preloadToLastOffsetInPartition critical exception e: ", e2);
                throw e2;
            }
        }
    }

    private void waitBackoff(BackOffExecution backOffExecution) {
        try {
            Thread.sleep(backOffExecution.nextBackOff());
        } catch (InterruptedException e) {
            log.error("PreloadListenerImpl InterruptedException when wait retry e: ", e);
            Thread.currentThread().interrupt();
        }
    }

    private BackOffExecution resetBackoff() {
        this.countAttempt.set(0);
        return this.backOff.start();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void safeHandle(java.util.function.Consumer<T> consumer, ConsumerRecord<K, T> consumerRecord) {
        try {
            consumer.accept(consumerRecord.value());
        } catch (RetryableException e) {
            log.error("PreloadListenerImpl preloadToLastOffsetInPartition RetryableException e: ", e);
            throw e;
        } catch (SkippedException e2) {
            log.error("PreloadListenerImpl preloadToLastOffsetInPartition SkippedException e: ", e2);
        }
    }

    public PreloadListenerImpl() {
        this.countAttempt = ThreadLocal.withInitial(() -> {
            return 0;
        });
        this.maxAttempt = 3;
        this.backOff = new ExponentialBackOff();
    }
}
