package ai.tripl.arc.execute;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple7;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;

/* compiled from: KafkaCommitExecute.scala */
/* loaded from: input_file:ai/tripl/arc/execute/KafkaCommitExecuteStage$.class */
public final class KafkaCommitExecuteStage$ implements Serializable {
    public static KafkaCommitExecuteStage$ MODULE$;

    static {
        new KafkaCommitExecuteStage$();
    }

    public Option<Dataset<Row>> execute(KafkaCommitExecuteStage kafkaCommitExecuteStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        Dataset table = sparkSession.table(kafkaCommitExecuteStage.inputView());
        HashMap hashMap = new HashMap();
        try {
            Dataset limit = table.groupBy(Predef$.MODULE$.wrapRefArray(new Column[]{table.apply("topic"), table.apply("partition")})).agg(functions$.MODULE$.max(table.apply("offset")), Predef$.MODULE$.wrapRefArray(new Column[0])).orderBy(Predef$.MODULE$.wrapRefArray(new Column[]{table.apply("topic"), table.apply("partition")})).limit(10000);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaCommitExecuteStage.bootstrapServers());
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("enable.auto.commit", "false");
            new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) limit.collect())).foreach(row -> {
                String string = row.getString(0);
                int i = row.getInt(1);
                long j = row.getLong(2) + 1;
                properties.put("group.id", new StringBuilder(1).append(kafkaCommitExecuteStage.groupID()).append("-").append(i).toString());
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                try {
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(new TopicPartition(string, i), new OffsetAndMetadata(j));
                    kafkaConsumer.commitSync(hashMap2);
                    hashMap.put(new StringBuilder(1).append(kafkaCommitExecuteStage.groupID()).append("-").append(i).toString(), Long.valueOf(j));
                    return kafkaCommitExecuteStage.stageDetail().put("offsets", hashMap);
                } finally {
                    kafkaConsumer.close();
                }
            });
            return None$.MODULE$;
        } catch (Exception e) {
            throw new KafkaCommitExecuteStage$$anon$1(e, kafkaCommitExecuteStage);
        }
    }

    public KafkaCommitExecuteStage apply(KafkaCommitExecute kafkaCommitExecute, String str, Option<String> option, String str2, String str3, String str4, Map<String, String> map) {
        return new KafkaCommitExecuteStage(kafkaCommitExecute, str, option, str2, str3, str4, map);
    }

    public Option<Tuple7<KafkaCommitExecute, String, Option<String>, String, String, String, Map<String, String>>> unapply(KafkaCommitExecuteStage kafkaCommitExecuteStage) {
        return kafkaCommitExecuteStage == null ? None$.MODULE$ : new Some(new Tuple7(kafkaCommitExecuteStage.m0plugin(), kafkaCommitExecuteStage.name(), kafkaCommitExecuteStage.description(), kafkaCommitExecuteStage.inputView(), kafkaCommitExecuteStage.bootstrapServers(), kafkaCommitExecuteStage.groupID(), kafkaCommitExecuteStage.params()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private KafkaCommitExecuteStage$() {
        MODULE$ = this;
    }
}
