package ai.chronon.spark.streaming;

import ai.chronon.online.DataStream;
import ai.chronon.online.StreamBuilder;
import ai.chronon.online.TopicInfo;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import scala.Predef$;
import scala.collection.immutable.Map;
import scala.runtime.ScalaRunTime$;

/* compiled from: KafkaStreamBuilder.scala */
/* loaded from: input_file:ai/chronon/spark/streaming/KafkaStreamBuilder$.class */
public final class KafkaStreamBuilder$ implements StreamBuilder {
    public static final KafkaStreamBuilder$ MODULE$ = new KafkaStreamBuilder$();

    public DataStream from(TopicInfo topicInfo, SparkSession sparkSession, Map<String, String> map) {
        Map params = topicInfo.params();
        String str = (String) params.getOrElse("bootstrap", () -> {
            return new StringBuilder(0).append((String) params.apply("host")).append(params.get("port").map(str2 -> {
                return new StringBuilder(1).append(":").append(str2).toString();
            }).getOrElse(() -> {
                return "";
            })).toString();
        });
        TopicChecker$.MODULE$.topicShouldExist(topicInfo.name(), str);
        sparkSession.streams().addListener(new StreamingQueryListener() { // from class: ai.chronon.spark.streaming.KafkaStreamBuilder$$anon$1
            public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
                Predef$.MODULE$.println(new StringBuilder(15).append("Query started: ").append(queryStartedEvent.id()).toString());
            }

            public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
                Predef$.MODULE$.println(new StringBuilder(18).append("Query terminated: ").append(queryTerminatedEvent.id()).toString());
            }

            public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
                Predef$.MODULE$.println(new StringBuilder(21).append("Query made progress: ").append(queryProgressEvent.progress()).toString());
            }
        });
        return new DataStream(sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", str).option("subscribe", topicInfo.name()).option("enable.auto.commit", "true").load().selectExpr(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"value"})), TopicChecker$.MODULE$.getPartitions(topicInfo.name(), str), topicInfo);
    }

    private KafkaStreamBuilder$() {
    }
}
