package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$implicits$;
import org.apache.spark.util.CollectionAccumulator;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Serializable;
import scala.Some;
import scala.Tuple14;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaExtract.scala */
/* loaded from: input_file:ai/tripl/arc/extract/KafkaExtractStage$.class */
public final class KafkaExtractStage$ implements Serializable {
    public static final KafkaExtractStage$ MODULE$ = null;

    static {
        new KafkaExtractStage$();
    }

    public Option<Dataset<Row>> execute(KafkaExtractStage kafkaExtractStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        Dataset dataset;
        Dataset repartition;
        Dataset dataset2;
        Dataset dataset3;
        CollectionAccumulator collectionAccumulator = sparkSession.sparkContext().collectionAccumulator();
        if (aRCContext.isStreaming()) {
            dataset = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", kafkaExtractStage.bootstrapServers()).option("subscribe", kafkaExtractStage.topic()).load();
        } else {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaExtractStage.bootstrapServers());
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("enable.auto.commit", "false");
            properties.put("auto.offset.reset", "earliest");
            properties.put("request.timeout.ms", BoxesRunTime.boxToLong(kafkaExtractStage.timeout()).toString());
            properties.put("session.timeout.ms", BoxesRunTime.boxToLong(Math.min(10000L, kafkaExtractStage.timeout() - 1)).toString());
            properties.put("fetch.max.wait.ms", BoxesRunTime.boxToLong(Math.min(500L, kafkaExtractStage.timeout() - 1)).toString());
            properties.put("heartbeat.interval.ms", BoxesRunTime.boxToLong(Math.min(3000L, kafkaExtractStage.timeout() - 2)).toString());
            Properties properties2 = new Properties();
            properties2.putAll(properties);
            properties2.put("group.id", kafkaExtractStage.groupID());
            Map liftedTree1$1 = liftedTree1$1(kafkaExtractStage, properties2);
            kafkaExtractStage.maxPollRecords();
            Dataset liftedTree2$1 = liftedTree2$1(kafkaExtractStage, sparkSession, collectionAccumulator, properties, liftedTree1$1, kafkaExtractStage.groupID(), kafkaExtractStage.topic(), kafkaExtractStage.timeout(), kafkaExtractStage.autoCommit());
            if (!kafkaExtractStage.autoCommit()) {
                ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(liftedTree1$1).asScala()).foreach(new KafkaExtractStage$$anonfun$4(new HashMap()));
            }
            dataset = liftedTree2$1;
        }
        Dataset dataset4 = dataset;
        List<String> partitionBy = kafkaExtractStage.partitionBy();
        if (Nil$.MODULE$.equals(partitionBy)) {
            Some numPartitions = kafkaExtractStage.numPartitions();
            if (numPartitions instanceof Some) {
                dataset3 = dataset4.repartition(BoxesRunTime.unboxToInt(numPartitions.x()));
            } else {
                if (!None$.MODULE$.equals(numPartitions)) {
                    throw new MatchError(numPartitions);
                }
                dataset3 = dataset4;
            }
            dataset2 = dataset3;
        } else {
            List list = (List) partitionBy.map(new KafkaExtractStage$$anonfun$5(dataset4), List$.MODULE$.canBuildFrom());
            Some numPartitions2 = kafkaExtractStage.numPartitions();
            if (numPartitions2 instanceof Some) {
                repartition = dataset4.repartition(BoxesRunTime.unboxToInt(numPartitions2.x()), list);
            } else {
                if (!None$.MODULE$.equals(numPartitions2)) {
                    throw new MatchError(numPartitions2);
                }
                repartition = dataset4.repartition(list);
            }
            dataset2 = repartition;
        }
        Dataset dataset5 = dataset2;
        if (aRCContext.immutableViews()) {
            dataset5.createTempView(kafkaExtractStage.outputView());
        } else {
            dataset5.createOrReplaceTempView(kafkaExtractStage.outputView());
        }
        if (dataset5.isStreaming()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            kafkaExtractStage.stageDetail().put("outputColumns", Integer.valueOf(dataset5.schema().length()));
            kafkaExtractStage.stageDetail().put("numPartitions", Integer.valueOf(dataset5.rdd().partitions().length));
        }
        if ((kafkaExtractStage.persist() || !kafkaExtractStage.autoCommit()) && !dataset5.isStreaming()) {
            dataset5.persist(aRCContext.storageLevel());
            long count = dataset5.count();
            List list2 = ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(collectionAccumulator.value()).asScala()).toList();
            aRCContext.userData().put("kafkaExtractOffsets", list2);
            HashMap hashMap = new HashMap();
            list2.foreach(new KafkaExtractStage$$anonfun$execute$1(hashMap));
            kafkaExtractStage.stageDetail().put("partitionsOffsets", hashMap);
            long unboxToLong = BoxesRunTime.unboxToLong(list2.foldLeft(BoxesRunTime.boxToLong(0L), new KafkaExtractStage$$anonfun$6()));
            kafkaExtractStage.stageDetail().put("records", Long.valueOf(count));
            if (unboxToLong != count) {
                throw new KafkaExtractStage$$anon$3(kafkaExtractStage, count, unboxToLong);
            }
        }
        return Option$.MODULE$.apply(dataset5);
    }

    public KafkaExtractStage apply(KafkaExtract kafkaExtract, String str, Option<String> option, String str2, String str3, String str4, String str5, int i, long j, boolean z, scala.collection.immutable.Map<String, String> map, boolean z2, Option<Object> option2, List<String> list) {
        return new KafkaExtractStage(kafkaExtract, str, option, str2, str3, str4, str5, i, j, z, map, z2, option2, list);
    }

    public Option<Tuple14<KafkaExtract, String, Option<String>, String, String, String, String, Object, Object, Object, scala.collection.immutable.Map<String, String>, Object, Option<Object>, List<String>>> unapply(KafkaExtractStage kafkaExtractStage) {
        return kafkaExtractStage == null ? None$.MODULE$ : new Some(new Tuple14(kafkaExtractStage.m4plugin(), kafkaExtractStage.name(), kafkaExtractStage.description(), kafkaExtractStage.outputView(), kafkaExtractStage.topic(), kafkaExtractStage.bootstrapServers(), kafkaExtractStage.groupID(), BoxesRunTime.boxToInteger(kafkaExtractStage.maxPollRecords()), BoxesRunTime.boxToLong(kafkaExtractStage.timeout()), BoxesRunTime.boxToBoolean(kafkaExtractStage.autoCommit()), kafkaExtractStage.params(), BoxesRunTime.boxToBoolean(kafkaExtractStage.persist()), kafkaExtractStage.numPartitions(), kafkaExtractStage.partitionBy()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private final Map liftedTree1$1(KafkaExtractStage kafkaExtractStage, Properties properties) {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                return kafkaConsumer.endOffsets((java.util.List) JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(kafkaConsumer.partitionsFor(kafkaExtractStage.topic())).asScala()).map(new KafkaExtractStage$$anonfun$3(kafkaExtractStage), Buffer$.MODULE$.canBuildFrom())).asJava());
            } finally {
                kafkaConsumer.close();
            }
        } catch (Exception e) {
            throw new KafkaExtractStage$$anon$1(kafkaExtractStage, e);
        }
    }

    private final Dataset liftedTree2$1(KafkaExtractStage kafkaExtractStage, SparkSession sparkSession, CollectionAccumulator collectionAccumulator, Properties properties, Map map, String str, String str2, long j, boolean z) {
        try {
            Dataset repartition = sparkSession.sqlContext().emptyDataFrame().repartition(map.size());
            KafkaExtractStage$$anonfun$liftedTree2$1$1 kafkaExtractStage$$anonfun$liftedTree2$1$1 = new KafkaExtractStage$$anonfun$liftedTree2$1$1(collectionAccumulator, properties, map, str, str2, j, z);
            SparkSession$implicits$ implicits = sparkSession.implicits();
            TypeTags universe = package$.MODULE$.universe();
            return repartition.mapPartitions(kafkaExtractStage$$anonfun$liftedTree2$1$1, implicits.newProductEncoder(universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: ai.tripl.arc.extract.KafkaExtractStage$$typecreator4$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("ai.tripl.arc.extract.KafkaExtractStage.KafkaRecord").asType().toTypeConstructor();
                }
            }))).toDF();
        } catch (Exception e) {
            throw new KafkaExtractStage$$anon$2(kafkaExtractStage, e);
        }
    }

    private KafkaExtractStage$() {
        MODULE$ = this;
    }
}
