package ai.tripl.arc.extract;

import ai.tripl.arc.extract.KafkaExtractStage;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.Row;
import org.apache.spark.util.CollectionAccumulator;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaExtract.scala */
/* loaded from: input_file:ai/tripl/arc/extract/KafkaExtractStage$$anonfun$liftedTree2$1$1.class */
public final class KafkaExtractStage$$anonfun$liftedTree2$1$1 extends AbstractFunction1<Iterator<Row>, Iterator<KafkaExtractStage.KafkaRecord>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final CollectionAccumulator kafkaPartitionAccumulator$1;
    private final Properties commonProps$1;
    private final Map endOffsets$1;
    private final String stageGroupID$1;
    private final String stageTopic$1;
    private final long stageTimeout$1;
    private final boolean stageAutoCommit$1;

    public final Iterator<KafkaExtractStage.KafkaRecord> apply(Iterator<Row> iterator) {
        int partitionId = TaskContext$.MODULE$.getPartitionId();
        Properties properties = new Properties();
        properties.putAll(this.commonProps$1);
        properties.put("group.id", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "-", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.stageGroupID$1, BoxesRunTime.boxToInteger(partitionId)})));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        TopicPartition topicPartition = new TopicPartition(this.stageTopic$1, partitionId);
        long longValue = ((Long) this.endOffsets$1.get(topicPartition)).longValue();
        try {
            kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition}))).asJava());
            this.kafkaPartitionAccumulator$1.add(new KafkaPartition(topicPartition, kafkaConsumer.position(topicPartition), longValue));
            List allKafkaRecords$1 = getAllKafkaRecords$1(getKafkaRecord$1(kafkaConsumer, longValue), Nil$.MODULE$, kafkaConsumer, longValue);
            if (this.stageAutoCommit$1) {
                HashMap hashMap = new HashMap();
                hashMap.put(topicPartition, new OffsetAndMetadata(longValue));
                kafkaConsumer.commitSync(hashMap);
            }
            return allKafkaRecords$1.toIterator();
        } finally {
            kafkaConsumer.close();
        }
    }

    private final List getKafkaRecord$1(KafkaConsumer kafkaConsumer, long j) {
        return ((TraversableOnce) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer.poll(Duration.ofMillis(this.stageTimeout$1)).records(this.stageTopic$1)).asScala()).filter(new KafkaExtractStage$$anonfun$liftedTree2$1$1$$anonfun$getKafkaRecord$1$1(this, j))).map(new KafkaExtractStage$$anonfun$liftedTree2$1$1$$anonfun$getKafkaRecord$1$2(this), Iterable$.MODULE$.canBuildFrom())).toList();
    }

    private final List getAllKafkaRecords$1(List list, List list2, KafkaConsumer kafkaConsumer, long j) {
        while (!Nil$.MODULE$.equals(list)) {
            List kafkaRecord$1 = getKafkaRecord$1(kafkaConsumer, j);
            list2 = list.$colon$colon$colon(list2);
            list = kafkaRecord$1;
        }
        return list2;
    }

    public KafkaExtractStage$$anonfun$liftedTree2$1$1(CollectionAccumulator collectionAccumulator, Properties properties, Map map, String str, String str2, long j, boolean z) {
        this.kafkaPartitionAccumulator$1 = collectionAccumulator;
        this.commonProps$1 = properties;
        this.endOffsets$1 = map;
        this.stageGroupID$1 = str;
        this.stageTopic$1 = str2;
        this.stageTimeout$1 = j;
        this.stageAutoCommit$1 = z;
    }
}
