package io.amient.affinity.core.storage.kafka;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;

/* compiled from: KafkaStorage.scala */
/* loaded from: input_file:io/amient/affinity/core/storage/kafka/KafkaStorage$$anon$3.class */
public final class KafkaStorage$$anon$3 extends Thread {
    private final KafkaConsumer<ByteBuffer, ByteBuffer> kafkaConsumer;
    private final TopicPartition tp;
    private final List<TopicPartition> consumerPartitions;
    private final /* synthetic */ KafkaStorage $outer;

    public KafkaConsumer<ByteBuffer, ByteBuffer> kafkaConsumer() {
        return this.kafkaConsumer;
    }

    public TopicPartition tp() {
        return this.tp;
    }

    public List<TopicPartition> consumerPartitions() {
        return this.consumerPartitions;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!isInterrupted()) {
            try {
                this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming_$eq(true);
                while (this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming()) {
                    if (isInterrupted()) {
                        throw new InterruptedException();
                    }
                    this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError().set(new BrokerNotAvailableException("Could not connect to Kafka"));
                    try {
                        ConsumerRecords poll = kafkaConsumer().poll(500L);
                        this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$consumerError().set(null);
                        IntRef create = IntRef.create(0);
                        ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(poll.iterator()).asScala()).foreach(new KafkaStorage$$anon$3$$anonfun$run$1(this, create));
                        if (!this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$tailing() && create.elem == 0) {
                            this.$outer.io$amient$affinity$core$storage$kafka$KafkaStorage$$consuming_$eq(false);
                        }
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } finally {
                    }
                }
                synchronized (this) {
                    notify();
                    wait();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            } catch (InterruptedException e) {
                kafkaConsumer().close();
                return;
            } catch (Throwable th) {
                kafkaConsumer().close();
                throw th;
            }
        }
        throw new InterruptedException();
    }

    public /* synthetic */ KafkaStorage io$amient$affinity$core$storage$kafka$KafkaStorage$$anon$$$outer() {
        return this.$outer;
    }

    public KafkaStorage$$anon$3(KafkaStorage kafkaStorage) {
        if (kafkaStorage == null) {
            throw null;
        }
        this.$outer = kafkaStorage;
        this.kafkaConsumer = new KafkaConsumer<>(kafkaStorage.consumerProps());
        this.tp = new TopicPartition(kafkaStorage.topic(), kafkaStorage.io$amient$affinity$core$storage$kafka$KafkaStorage$$partition);
        this.consumerPartitions = Arrays.asList(tp());
        kafkaConsumer().assign(consumerPartitions());
        kafkaConsumer().seekToBeginning(consumerPartitions());
    }
}
