package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.extract.KafkaExtractStage;
import ai.tripl.arc.util.log.logger.Logger;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.LongAccumulator;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple17;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.math.Ordering$String$;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KafkaExtract.scala */
/* loaded from: input_file:ai/tripl/arc/extract/KafkaExtractStage$.class */
public final class KafkaExtractStage$ implements Serializable {
    public static KafkaExtractStage$ MODULE$;
    private final String KAFKA_EXTRACT_OFFSET_KEY;

    static {
        new KafkaExtractStage$();
    }

    public String KAFKA_EXTRACT_OFFSET_KEY() {
        return this.KAFKA_EXTRACT_OFFSET_KEY;
    }

    public Option<Dataset<Row>> execute(KafkaExtractStage kafkaExtractStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        Dataset liftedTree2$1;
        Dataset repartition;
        LongAccumulator longAccumulator = sparkSession.sparkContext().longAccumulator();
        LongAccumulator longAccumulator2 = sparkSession.sparkContext().longAccumulator();
        CollectionAccumulator collectionAccumulator = sparkSession.sparkContext().collectionAccumulator();
        if (aRCContext.isStreaming()) {
            liftedTree2$1 = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", kafkaExtractStage.bootstrapServers()).option("subscribe", kafkaExtractStage.topic()).options(kafkaExtractStage.params()).load();
        } else {
            Properties properties = new Properties();
            addCommonProps(kafkaExtractStage, properties);
            properties.put("group.id", kafkaExtractStage.groupID());
            Map liftedTree1$1 = liftedTree1$1(properties, kafkaExtractStage);
            Option map = kafkaExtractStage.maxRecords().map(obj -> {
                return $anonfun$execute$2(liftedTree1$1, BoxesRunTime.unboxToInt(obj));
            });
            int maxPollRecords = kafkaExtractStage.maxPollRecords();
            String groupID = kafkaExtractStage.groupID();
            liftedTree2$1 = liftedTree2$1(sparkSession, liftedTree1$1, kafkaExtractStage.timeout(), kafkaExtractStage.topic(), longAccumulator, longAccumulator2, kafkaExtractStage, groupID, maxPollRecords, map, collectionAccumulator, kafkaExtractStage.autoCommit());
        }
        Dataset dataset = liftedTree2$1;
        List<String> partitionBy = kafkaExtractStage.partitionBy();
        if (Nil$.MODULE$.equals(partitionBy)) {
            Some numPartitions = kafkaExtractStage.numPartitions();
            if (numPartitions instanceof Some) {
                repartition = dataset.repartition(BoxesRunTime.unboxToInt(numPartitions.value()));
            } else {
                if (!None$.MODULE$.equals(numPartitions)) {
                    throw new MatchError(numPartitions);
                }
                repartition = dataset;
            }
        } else {
            List list = (List) partitionBy.map(str -> {
                return dataset.apply(str);
            }, List$.MODULE$.canBuildFrom());
            Some numPartitions2 = kafkaExtractStage.numPartitions();
            if (numPartitions2 instanceof Some) {
                repartition = dataset.repartition(BoxesRunTime.unboxToInt(numPartitions2.value()), list);
            } else {
                if (!None$.MODULE$.equals(numPartitions2)) {
                    throw new MatchError(numPartitions2);
                }
                repartition = dataset.repartition(list);
            }
        }
        Dataset dataset2 = repartition;
        if (aRCContext.immutableViews()) {
            dataset2.createTempView(kafkaExtractStage.outputView());
        } else {
            dataset2.createOrReplaceTempView(kafkaExtractStage.outputView());
        }
        if (dataset2.isStreaming()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            kafkaExtractStage.stageDetail().put("outputColumns", Integer.valueOf(dataset2.schema().length()));
            kafkaExtractStage.stageDetail().put("numPartitions", Integer.valueOf(dataset2.rdd().partitions().length));
        }
        if ((kafkaExtractStage.persist() || !kafkaExtractStage.autoCommit()) && !dataset2.isStreaming()) {
            dataset2.persist(aRCContext.storageLevel());
            long count = dataset2.count();
            kafkaExtractStage.stageDetail().put("records", Long.valueOf(count));
            HashMap hashMap = new HashMap();
            hashMap.put("recordsRead", Long.valueOf(Predef$.MODULE$.Long2long(longAccumulator.value())));
            hashMap.put("bytesRead", Long.valueOf(Predef$.MODULE$.Long2long(longAccumulator2.value())));
            kafkaExtractStage.stageDetail().put("inputMetrics", hashMap);
            List list2 = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(collectionAccumulator.value()).asScala()).toList();
            aRCContext.userData().put(new StringBuilder(1).append(KAFKA_EXTRACT_OFFSET_KEY()).append("_").append(kafkaExtractStage.outputView()).toString(), list2);
            HashMap hashMap2 = new HashMap();
            list2.foreach(kafkaPartition -> {
                HashMap hashMap3 = new HashMap();
                hashMap3.put("startOffset", BoxesRunTime.boxToLong(kafkaPartition.position()));
                hashMap3.put("endOffset", BoxesRunTime.boxToLong(kafkaPartition.endOffset()));
                return (HashMap) hashMap2.put(BoxesRunTime.boxToInteger(kafkaPartition.topicPartition().partition()), hashMap3);
            });
            kafkaExtractStage.stageDetail().put("partitionsOffsets", hashMap2);
            if (kafkaExtractStage.strict()) {
                long unboxToLong = BoxesRunTime.unboxToLong(list2.foldLeft(BoxesRunTime.boxToLong(0L), (obj2, kafkaPartition2) -> {
                    return BoxesRunTime.boxToLong($anonfun$execute$9(BoxesRunTime.unboxToLong(obj2), kafkaPartition2));
                }));
                if (unboxToLong != count) {
                    throw new KafkaExtractStage$$anon$3(kafkaExtractStage, unboxToLong, count);
                }
            }
        }
        return Option$.MODULE$.apply(dataset2);
    }

    public Seq<Seq<Object>> splitN(Seq<Object> seq, int i) {
        int length = seq.length();
        return snip$1(seq, (IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), i).map(i2 -> {
            return (int) package$.MODULE$.round((i2 * length) / i);
        }, IndexedSeq$.MODULE$.canBuildFrom()), (Seq) Nil$.MODULE$);
    }

    private void addCommonProps(KafkaExtractStage kafkaExtractStage, Properties properties) {
        properties.put("bootstrap.servers", kafkaExtractStage.bootstrapServers());
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put("request.timeout.ms", Long.toString(kafkaExtractStage.timeout()));
        properties.put("session.timeout.ms", Long.toString(Math.min(10000L, kafkaExtractStage.timeout() - 1)));
        properties.put("fetch.max.wait.ms", Long.toString(Math.min(500L, kafkaExtractStage.timeout() - 1)));
        properties.put("heartbeat.interval.ms", Long.toString(Math.min(3000L, kafkaExtractStage.timeout() - 2)));
        kafkaExtractStage.params().foreach(tuple2 -> {
            if (tuple2 != null) {
                return properties.put((String) tuple2._1(), (String) tuple2._2());
            }
            throw new MatchError(tuple2);
        });
    }

    public KafkaExtractStage apply(KafkaExtract kafkaExtract, Option<String> option, String str, Option<String> option2, String str2, String str3, String str4, String str5, int i, Option<Object> option3, long j, boolean z, scala.collection.immutable.Map<String, String> map, boolean z2, Option<Object> option4, List<String> list, boolean z3) {
        return new KafkaExtractStage(kafkaExtract, option, str, option2, str2, str3, str4, str5, i, option3, j, z, map, z2, option4, list, z3);
    }

    public Option<Tuple17<KafkaExtract, Option<String>, String, Option<String>, String, String, String, String, Object, Option<Object>, Object, Object, scala.collection.immutable.Map<String, String>, Object, Option<Object>, List<String>, Object>> unapply(KafkaExtractStage kafkaExtractStage) {
        return kafkaExtractStage == null ? None$.MODULE$ : new Some(new Tuple17(kafkaExtractStage.m2plugin(), kafkaExtractStage.id(), kafkaExtractStage.name(), kafkaExtractStage.description(), kafkaExtractStage.outputView(), kafkaExtractStage.topic(), kafkaExtractStage.bootstrapServers(), kafkaExtractStage.groupID(), BoxesRunTime.boxToInteger(kafkaExtractStage.maxPollRecords()), kafkaExtractStage.maxRecords(), BoxesRunTime.boxToLong(kafkaExtractStage.timeout()), BoxesRunTime.boxToBoolean(kafkaExtractStage.autoCommit()), kafkaExtractStage.params(), BoxesRunTime.boxToBoolean(kafkaExtractStage.persist()), kafkaExtractStage.numPartitions(), kafkaExtractStage.partitionBy(), BoxesRunTime.boxToBoolean(kafkaExtractStage.strict())));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private static final /* synthetic */ Map liftedTree1$1(Properties properties, KafkaExtractStage kafkaExtractStage) {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            if (!((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(kafkaConsumer.listTopics()).asScala()).contains(kafkaExtractStage.topic())) {
                throw new Exception(new StringBuilder(61).append("topic '").append(kafkaExtractStage.topic()).append("' not found in Kafka cluster with bootstrapServers '").append(kafkaExtractStage.bootstrapServers()).append("'.").toString());
            }
            try {
                Map endOffsets = kafkaConsumer.endOffsets((java.util.List) JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(kafkaConsumer.partitionsFor(kafkaExtractStage.topic())).asScala()).map(partitionInfo -> {
                    return new TopicPartition(kafkaExtractStage.topic(), partitionInfo.partition());
                }, Buffer$.MODULE$.canBuildFrom())).asJava());
                kafkaConsumer.close();
                return endOffsets;
            } catch (Throwable th) {
                kafkaConsumer.close();
                throw th;
            }
        } catch (Exception e) {
            throw new KafkaExtractStage$$anon$1(e, kafkaExtractStage);
        }
    }

    public static final /* synthetic */ long $anonfun$execute$3(Seq seq) {
        return seq.length();
    }

    public static final /* synthetic */ Seq $anonfun$execute$2(Map map, int i) {
        return (Seq) MODULE$.splitN(RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i), map.size()).map(seq -> {
            return BoxesRunTime.boxToLong($anonfun$execute$3(seq));
        }, Seq$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$execute$5(long j, ConsumerRecord consumerRecord) {
        return consumerRecord.offset() < j;
    }

    private static final List getKafkaRecord$1(KafkaConsumer kafkaConsumer, long j, long j2, String str, LongAccumulator longAccumulator, LongAccumulator longAccumulator2) {
        return ((TraversableOnce) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer.poll(Duration.ofMillis(j2)).records(str)).asScala()).filter(consumerRecord -> {
            return BoxesRunTime.boxToBoolean($anonfun$execute$5(j, consumerRecord));
        })).map(consumerRecord2 -> {
            longAccumulator.add(1L);
            longAccumulator2.add((consumerRecord2.key() != null ? ((byte[]) consumerRecord2.key()).length : 0) + (consumerRecord2.value() != null ? ((byte[]) consumerRecord2.value()).length : 0));
            return new KafkaExtractStage.KafkaRecord((byte[]) consumerRecord2.key(), (byte[]) consumerRecord2.value(), consumerRecord2.topic(), consumerRecord2.partition(), consumerRecord2.offset(), consumerRecord2.timestamp(), consumerRecord2.timestampType().id);
        }, Iterable$.MODULE$.canBuildFrom())).toList();
    }

    private final List getAllKafkaRecords$1(KafkaConsumer kafkaConsumer, long j, List list, List list2, long j2, String str, LongAccumulator longAccumulator, LongAccumulator longAccumulator2) {
        while (!Nil$.MODULE$.equals(list)) {
            List kafkaRecord$1 = getKafkaRecord$1(kafkaConsumer, j, j2, str, longAccumulator, longAccumulator2);
            list2 = list.$colon$colon$colon(list2);
            list = kafkaRecord$1;
            j = j;
            kafkaConsumer = kafkaConsumer;
        }
        return list2;
    }

    private final /* synthetic */ Dataset liftedTree2$1(SparkSession sparkSession, Map map, long j, String str, LongAccumulator longAccumulator, LongAccumulator longAccumulator2, KafkaExtractStage kafkaExtractStage, String str2, int i, Option option, CollectionAccumulator collectionAccumulator, boolean z) {
        try {
            SparkSession$implicits$ implicits = sparkSession.implicits();
            RDD repartition = sparkSession.sparkContext().parallelize(Nil$.MODULE$, sparkSession.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)).repartition(map.size(), Ordering$String$.MODULE$);
            RDD mapPartitions = repartition.mapPartitions(iterator -> {
                long longValue;
                int partitionId = TaskContext$.MODULE$.getPartitionId();
                Properties properties = new Properties();
                MODULE$.addCommonProps(kafkaExtractStage, properties);
                properties.put("group.id", new StringBuilder(1).append(str2).append("-").append(partitionId).toString());
                properties.put("max.poll.records", Integer.toString(i));
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                try {
                    TopicPartition topicPartition = new TopicPartition(str, partitionId);
                    kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(topicPartition, Nil$.MODULE$)).asJava());
                    long position = kafkaConsumer.position(topicPartition);
                    if (option instanceof Some) {
                        longValue = Math.min(position + BoxesRunTime.unboxToLong(((Seq) ((Some) option).value()).apply(partitionId)), ((Long) map.get(topicPartition)).longValue());
                    } else {
                        if (!None$.MODULE$.equals(option)) {
                            throw new MatchError(option);
                        }
                        longValue = ((Long) map.get(topicPartition)).longValue();
                    }
                    long j2 = longValue;
                    List allKafkaRecords$1 = this.getAllKafkaRecords$1(kafkaConsumer, j2, getKafkaRecord$1(kafkaConsumer, j2, j, str, longAccumulator, longAccumulator2), Nil$.MODULE$, j, str, longAccumulator, longAccumulator2);
                    collectionAccumulator.add(new KafkaPartition(topicPartition, position, j2));
                    if (z) {
                        HashMap hashMap = new HashMap();
                        hashMap.put(topicPartition, new OffsetAndMetadata(j2));
                        kafkaConsumer.commitSync(hashMap);
                    }
                    return allKafkaRecords$1.toIterator();
                } finally {
                    kafkaConsumer.close();
                }
            }, repartition.mapPartitions$default$2(), ClassTag$.MODULE$.apply(KafkaExtractStage.KafkaRecord.class));
            SparkSession$implicits$ implicits2 = sparkSession.implicits();
            TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
            return implicits.rddToDatasetHolder(mapPartitions, implicits2.newProductEncoder(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.tripl.arc.extract.KafkaExtractStage$$typecreator6$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("ai.tripl.arc.extract.KafkaExtractStage.KafkaRecord").asType().toTypeConstructor();
                }
            }))).toDF();
        } catch (Exception e) {
            throw new KafkaExtractStage$$anon$2(e, kafkaExtractStage);
        }
    }

    public static final /* synthetic */ long $anonfun$execute$9(long j, KafkaPartition kafkaPartition) {
        return j + (kafkaPartition.endOffset() - kafkaPartition.position());
    }

    private final Seq snip$1(Seq seq, Seq seq2, Seq seq3) {
        while (seq2.length() >= 2) {
            Tuple2.mcII.sp spVar = new Tuple2.mcII.sp(BoxesRunTime.unboxToInt(seq2.head()), BoxesRunTime.unboxToInt(((IterableLike) seq2.tail()).head()));
            if (spVar == null) {
                throw new MatchError(spVar);
            }
            Tuple2.mcII.sp spVar2 = new Tuple2.mcII.sp(spVar._1$mcI$sp(), spVar._2$mcI$sp());
            int _1$mcI$sp = spVar2._1$mcI$sp();
            int _2$mcI$sp = spVar2._2$mcI$sp();
            Seq seq4 = (Seq) seq.drop(_2$mcI$sp - _1$mcI$sp);
            Seq seq5 = (Seq) seq2.tail();
            seq3 = (Seq) seq3.$colon$plus(seq.take(_2$mcI$sp - _1$mcI$sp), Seq$.MODULE$.canBuildFrom());
            seq2 = seq5;
            seq = seq4;
        }
        return seq3;
    }

    private KafkaExtractStage$() {
        MODULE$ = this;
        this.KAFKA_EXTRACT_OFFSET_KEY = "kafkaExtractOffset";
    }
}
