package com.acxiom.kafka.drivers;

import com.acxiom.pipeline.PipelineExecution;
import com.acxiom.pipeline.drivers.DriverSetup;
import com.acxiom.pipeline.drivers.StreamingDataParser;
import com.acxiom.pipeline.utils.CommonParameters;
import com.acxiom.pipeline.utils.CommonStreamingParameters;
import com.acxiom.pipeline.utils.DriverUtils$;
import com.acxiom.pipeline.utils.ReflectionUtils$;
import com.acxiom.pipeline.utils.StreamingUtils$;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.ConsumerStrategies$;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils$;
import org.apache.spark.streaming.kafka010.LocationStrategies$;
import org.apache.spark.streaming.kafka010.OffsetRange;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaPipelineDriver.scala */
/* loaded from: input_file:com/acxiom/kafka/drivers/KafkaPipelineDriver$.class */
public final class KafkaPipelineDriver$ {
    public static KafkaPipelineDriver$ MODULE$;
    private final Logger logger;

    static {
        new KafkaPipelineDriver$();
    }

    private Logger logger() {
        return this.logger;
    }

    public void main(String[] strArr) {
        Map extractParameters = DriverUtils$.MODULE$.extractParameters(strArr, new Some(new $colon.colon("driverSetupClass", new $colon.colon("topics", new $colon.colon("kafkaNodes", Nil$.MODULE$)))));
        CommonParameters parseCommonParameters = DriverUtils$.MODULE$.parseCommonParameters(extractParameters);
        CommonStreamingParameters parseCommonStreamingParameters = StreamingUtils$.MODULE$.parseCommonStreamingParameters(extractParameters);
        DriverSetup driverSetup = (DriverSetup) ReflectionUtils$.MODULE$.loadClass(parseCommonParameters.initializationClass(), new Some(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("parameters"), extractParameters)}))), ReflectionUtils$.MODULE$.loadClass$default$3());
        String[] split = ((String) extractParameters.apply("topics")).split(",");
        logger().info(new StringBuilder(43).append("Listening for Kafka messages using topics: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(split)).mkString(",")).toString());
        if (driverSetup.executionPlan().isEmpty()) {
            throw new IllegalStateException(new StringBuilder(75).append("Unable to obtain valid execution plan. Please check the DriverSetup class: ").append(parseCommonParameters.initializationClass()).toString());
        }
        List list = (List) driverSetup.executionPlan().get();
        SparkSession sparkSession = (SparkSession) ((PipelineExecution) list.head()).pipelineContext().sparkSession().get();
        StreamingContext createStreamingContext = StreamingUtils$.MODULE$.createStreamingContext(sparkSession.sparkContext(), new Some((String) extractParameters.getOrElse("duration-type", () -> {
            return "seconds";
        })), new Some((String) extractParameters.getOrElse("duration", () -> {
            return "10";
        })));
        InputDStream createDirectStream = KafkaUtils$.MODULE$.createDirectStream(createStreamingContext, LocationStrategies$.MODULE$.PreferConsistent(), ConsumerStrategies$.MODULE$.Subscribe(Predef$.MODULE$.wrapRefArray(split), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), (String) extractParameters.apply("kafkaNodes")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("key.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("value.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), (String) extractParameters.getOrElse("groupId", () -> {
            return "default_stream_listener";
        })), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.offset.reset"), "earliest"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), Predef$.MODULE$.boolean2Boolean(false))}))));
        KafkaStreamingDataParser kafkaStreamingDataParser = new KafkaStreamingDataParser();
        List generateStreamingDataParsers = StreamingUtils$.MODULE$.generateStreamingDataParsers(extractParameters, new Some(new $colon.colon(kafkaStreamingDataParser, Nil$.MODULE$)));
        createDirectStream.foreachRDD(rdd -> {
            $anonfun$main$4(parseCommonStreamingParameters, generateStreamingDataParsers, kafkaStreamingDataParser, sparkSession, driverSetup, list, createDirectStream, parseCommonParameters, rdd);
            return BoxedUnit.UNIT;
        });
        createStreamingContext.start();
        StreamingUtils$.MODULE$.setTerminationState(createStreamingContext, extractParameters);
        logger().info("Shutting down Kafka Pipeline Driver");
    }

    public static final /* synthetic */ void $anonfun$main$4(CommonStreamingParameters commonStreamingParameters, List list, KafkaStreamingDataParser kafkaStreamingDataParser, SparkSession sparkSession, DriverSetup driverSetup, List list2, InputDStream inputDStream, CommonParameters commonParameters, RDD rdd) {
        if (commonStreamingParameters.processEmptyRDD() || !rdd.isEmpty()) {
            MODULE$.logger().debug("RDD received");
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
            Dataset parseRDD = ((StreamingDataParser) StreamingUtils$.MODULE$.getStreamingParser(rdd, list).getOrElse(() -> {
                return kafkaStreamingDataParser;
            })).parseRDD(rdd, sparkSession);
            MODULE$.logger().debug(new StringBuilder(19).append("Processing offsets ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(offsetRanges)).mkString()).toString());
            DriverUtils$.MODULE$.processExecutionPlan(driverSetup, list2, new Some(parseRDD), () -> {
                ((CanCommitOffsets) inputDStream).commitAsync(offsetRanges);
                MODULE$.logger().debug(new StringBuilder(25).append("Committing Kafka offsets ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(offsetRanges)).mkString(",")).toString());
            }, commonParameters.terminateAfterFailures(), 1, commonParameters.maxRetryAttempts());
        }
    }

    private KafkaPipelineDriver$() {
        MODULE$ = this;
        this.logger = Logger.getLogger(getClass());
    }
}
