package com.acxiom.aws.drivers;

import com.acxiom.pipeline.PipelineExecution;
import com.acxiom.pipeline.drivers.DriverSetup;
import com.acxiom.pipeline.utils.DriverUtils$;
import com.acxiom.pipeline.utils.ReflectionUtils$;
import com.acxiom.pipeline.utils.StreamingUtils$;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: KinesisPipelineDriver.scala */
/* loaded from: input_file:com/acxiom/aws/drivers/KinesisPipelineDriver$.class */
public final class KinesisPipelineDriver$ {
    public static final KinesisPipelineDriver$ MODULE$ = null;
    private final Logger com$acxiom$aws$drivers$KinesisPipelineDriver$$logger;

    static {
        new KinesisPipelineDriver$();
    }

    public Logger com$acxiom$aws$drivers$KinesisPipelineDriver$$logger() {
        return this.com$acxiom$aws$drivers$KinesisPipelineDriver$$logger;
    }

    public void main(String[] strArr) {
        Map<String, Object> extractParameters = DriverUtils$.MODULE$.extractParameters(strArr, new Some(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"driverSetupClass", "appName", "streamName", "endPointURL", "regionName", "awsAccessKey", "awsAccessSecret"}))));
        String str = (String) extractParameters.apply("driverSetupClass");
        DriverSetup driverSetup = (DriverSetup) ReflectionUtils$.MODULE$.loadClass(str, new Some(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("parameters"), extractParameters)}))), ReflectionUtils$.MODULE$.loadClass$default$3());
        if (driverSetup.executionPlan().isEmpty()) {
            throw new IllegalStateException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to obtain valid execution plan. Please check the DriverSetup class: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        }
        List list = (List) driverSetup.executionPlan().get();
        SparkSession sparkSession = (SparkSession) ((PipelineExecution) list.head()).pipelineContext().sparkSession().get();
        String str2 = (String) extractParameters.apply("awsAccessKey");
        String str3 = (String) extractParameters.apply("awsAccessSecret");
        String str4 = (String) extractParameters.apply("appName");
        Duration duration = StreamingUtils$.MODULE$.getDuration(new Some((String) extractParameters.getOrElse("duration-type", new KinesisPipelineDriver$$anonfun$2())), new Some((String) extractParameters.getOrElse("duration", new KinesisPipelineDriver$$anonfun$3())));
        StreamingContext createStreamingContext = StreamingUtils$.MODULE$.createStreamingContext(sparkSession.sparkContext(), new Some(duration));
        AmazonKinesisClient amazonKinesisClient = new AmazonKinesisClient(new BasicAWSCredentials(str2, str3));
        amazonKinesisClient.setEndpoint((String) extractParameters.apply("endPointURL"));
        int size = amazonKinesisClient.describeStream((String) extractParameters.apply("streamName")).getStreamDescription().getShards().size();
        com$acxiom$aws$drivers$KinesisPipelineDriver$$logger().info(new StringBuilder().append("Number of Kinesis shards is : ").append(BoxesRunTime.boxToInteger(size)).toString());
        IndexedSeq<KinesisInputDStream<Row>> createKinesisDStreams = createKinesisDStreams(str2, str3, str4, duration, createStreamingContext, BoxesRunTime.unboxToInt(extractParameters.getOrElse("consumerStreams", new KinesisPipelineDriver$$anonfun$1(size))), extractParameters);
        com$acxiom$aws$drivers$KinesisPipelineDriver$$logger().info(new StringBuilder().append("Created ").append(BoxesRunTime.boxToInteger(createKinesisDStreams.size())).append(" Kinesis DStreams").toString());
        createStreamingContext.union(createKinesisDStreams, ClassTag$.MODULE$.apply(Row.class)).foreachRDD(new KinesisPipelineDriver$$anonfun$main$1(driverSetup, list, sparkSession));
        createStreamingContext.start();
        StreamingUtils$.MODULE$.setTerminationState(createStreamingContext, extractParameters);
        com$acxiom$aws$drivers$KinesisPipelineDriver$$logger().info("Shutting down Kinesis Pipeline Driver");
    }

    private IndexedSeq<KinesisInputDStream<Row>> createKinesisDStreams(String str, String str2, String str3, Duration duration, StreamingContext streamingContext, int i, Map<String, Object> map) {
        return (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).map(new KinesisPipelineDriver$$anonfun$createKinesisDStreams$1(str, str2, str3, duration, streamingContext, map), IndexedSeq$.MODULE$.canBuildFrom());
    }

    private KinesisPipelineDriver$() {
        MODULE$ = this;
        this.com$acxiom$aws$drivers$KinesisPipelineDriver$$logger = Logger.getLogger(getClass());
    }
}
