package com.gengoai.stream.spark;

import com.gengoai.collection.Lists;
import com.gengoai.config.Config;
import com.gengoai.config.ConfigScanner;
import com.gengoai.conversion.Cast;
import com.gengoai.io.resource.Resource;
import com.gengoai.stream.MAccumulator;
import com.gengoai.stream.MCounterAccumulator;
import com.gengoai.stream.MDoubleAccumulator;
import com.gengoai.stream.MLongAccumulator;
import com.gengoai.stream.MMapAccumulator;
import com.gengoai.stream.MMultiCounterAccumulator;
import com.gengoai.stream.MStatisticsAccumulator;
import com.gengoai.stream.MStream;
import com.gengoai.stream.StreamingContext;
import com.gengoai.stream.local.LocalMSetAccumulator;
import com.gengoai.string.StringMatcher;
import com.gengoai.string.Strings;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.CollectionAccumulator;
import scala.Option;

/* loaded from: input_file:com/gengoai/stream/spark/SparkStreamingContext.class */
public final class SparkStreamingContext extends StreamingContext {
    public static final SparkStreamingContext INSTANCE = new SparkStreamingContext();
    public static final String SPARK_APPNAME = "spark.appName";
    public static final String SPARK_MASTER = "spark.master";
    private static final long serialVersionUID = 1;
    public static volatile JavaSparkContext context;
    public static volatile SparkSession sparkSession;
    private volatile Broadcast<Config> configBroadcast;

    public static SparkStreamingContext contextOf(SparkStream<?> sparkStream) {
        return contextOf(sparkStream.getRDD().context());
    }

    public static SparkStreamingContext contextOf(SparkDoubleStream sparkDoubleStream) {
        return contextOf(sparkDoubleStream.getRDD().context());
    }

    public static SparkStreamingContext contextOf(SparkPairStream<?, ?> sparkPairStream) {
        return contextOf(sparkPairStream.getRDD().context());
    }

    private static SparkStreamingContext contextOf(SparkContext sparkContext) {
        if (context == null || context.sc().isStopped()) {
            synchronized (SparkStreamingContext.class) {
                if (context == null || context.sc().isStopped()) {
                    context = new JavaSparkContext(sparkContext);
                }
            }
        }
        return INSTANCE;
    }

    private static JavaSparkContext getSparkContext() {
        if (context == null || context.sc().isStopped()) {
            synchronized (SparkStreamingContext.class) {
                if (context == null || context.sc().isStopped()) {
                    context = new JavaSparkContext(getSparkSession().sparkContext());
                }
            }
        }
        return context;
    }

    private static SparkSession getSparkSession() {
        if (sparkSession == null || sparkSession.sparkContext().isStopped()) {
            synchronized (SparkStreamingContext.class) {
                if (sparkSession == null || sparkSession.sparkContext().isStopped()) {
                    SparkSession.Builder appName = SparkSession.builder().master(Config.get(SPARK_MASTER, new Object[0]).asString("local[*]")).appName(Config.get(SPARK_APPNAME, new Object[0]).asString(Strings.randomHexString(20)));
                    Config.getPropertiesMatching(StringMatcher.startsWith("spark.config.")).forEach(str -> {
                        appName.config(str.substring("spark.config.".length()), Config.get(str, new Object[0]).asString());
                    });
                    sparkSession = appName.getOrCreate();
                }
            }
        }
        return sparkSession;
    }

    public <T> Broadcast<T> broadcast(T t) {
        return getSparkContext().broadcast(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        context.close();
    }

    @Override // com.gengoai.stream.StreamingContext
    public <E> MCounterAccumulator<E> counterAccumulator(String str) {
        SparkMCounterAccumulator sparkMCounterAccumulator = new SparkMCounterAccumulator(str);
        sparkMCounterAccumulator.register();
        return sparkMCounterAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public MDoubleAccumulator doubleAccumulator(double d, String str) {
        SparkMDoubleAccumulator sparkMDoubleAccumulator = new SparkMDoubleAccumulator(str);
        sparkMDoubleAccumulator.add(d);
        sparkMDoubleAccumulator.register();
        return sparkMDoubleAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public SparkDoubleStream doubleStream(DoubleStream doubleStream) {
        return doubleStream == null ? empty().mapToDouble(obj -> {
            return Double.NaN;
        }) : new SparkDoubleStream(getSparkContext().parallelizeDoubles((List) doubleStream.boxed().collect(Collectors.toList())));
    }

    @Override // com.gengoai.stream.StreamingContext
    public <T> SparkStream<T> empty() {
        return new SparkStream<>(sparkContext().emptyRDD());
    }

    public Broadcast<Config> getConfigBroadcast() {
        if (this.configBroadcast == null || !this.configBroadcast.isValid()) {
            synchronized (SparkStreamingContext.class) {
                if (this.configBroadcast == null || !this.configBroadcast.isValid()) {
                    this.configBroadcast = broadcast(Config.getInstance());
                }
            }
        }
        return this.configBroadcast;
    }

    @Override // com.gengoai.stream.StreamingContext
    public boolean isDistributed() {
        return true;
    }

    @Override // com.gengoai.stream.StreamingContext
    public <E> MAccumulator<E, List<E>> listAccumulator(String str) {
        CollectionAccumulator collectionAccumulator = new CollectionAccumulator();
        collectionAccumulator.register(sparkContext().sc(), Option.apply(str), false);
        return new SparkMAccumulator((AccumulatorV2) collectionAccumulator);
    }

    @Override // com.gengoai.stream.StreamingContext
    public MLongAccumulator longAccumulator(long j, String str) {
        SparkMLongAccumulator sparkMLongAccumulator = new SparkMLongAccumulator(str);
        sparkMLongAccumulator.add(j);
        sparkMLongAccumulator.register();
        return sparkMLongAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public <K, V> MMapAccumulator<K, V> mapAccumulator(String str) {
        SparkMMapAccumulator sparkMMapAccumulator = new SparkMMapAccumulator(str);
        sparkMMapAccumulator.register();
        return sparkMMapAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public <K1, K2> MMultiCounterAccumulator<K1, K2> multiCounterAccumulator(String str) {
        SparkMMultiCounterAccumulator sparkMMultiCounterAccumulator = new SparkMMultiCounterAccumulator(str);
        sparkMMultiCounterAccumulator.register();
        return sparkMMultiCounterAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public <K, V> SparkPairStream<K, V> pairStream(Map<? extends K, ? extends V> map) {
        return map == null ? new SparkPairStream<>(new HashMap()) : new SparkPairStream<>(map);
    }

    @Override // com.gengoai.stream.StreamingContext
    public <K, V> SparkPairStream<K, V> pairStream(Collection<Map.Entry<? extends K, ? extends V>> collection) {
        return stream((Iterable) collection).mapToPair(entry -> {
            return entry;
        });
    }

    @Override // com.gengoai.stream.StreamingContext
    public SparkStream<Integer> range(int i, int i2) {
        return new SparkStream<>((List) IntStream.range(i, i2).boxed().collect(Collectors.toList()));
    }

    @Override // com.gengoai.stream.StreamingContext
    public <E> MAccumulator<E, Set<E>> setAccumulator(String str) {
        SparkMAccumulator sparkMAccumulator = new SparkMAccumulator(new LocalMSetAccumulator(str));
        sparkMAccumulator.register();
        return sparkMAccumulator;
    }

    public JavaSparkContext sparkContext() {
        return getSparkContext();
    }

    public SparkSession sparkSession() {
        return getSparkSession();
    }

    @Override // com.gengoai.stream.StreamingContext
    public MStatisticsAccumulator statisticsAccumulator(String str) {
        SparkMStatisticsAccumulator sparkMStatisticsAccumulator = new SparkMStatisticsAccumulator(str);
        sparkMStatisticsAccumulator.register();
        return sparkMStatisticsAccumulator;
    }

    @Override // com.gengoai.stream.StreamingContext
    public <T> SparkStream<T> stream(Stream<T> stream) {
        return stream == null ? empty() : new SparkStream<>((List) stream.collect(Collectors.toList()));
    }

    @Override // com.gengoai.stream.StreamingContext
    public <T> SparkStream<T> stream(Iterable<? extends T> iterable) {
        if (iterable == null) {
            return empty();
        }
        return new SparkStream<>(iterable instanceof List ? getSparkContext().parallelize((List) Cast.as(iterable)) : getSparkContext().parallelize(Lists.asArrayList(iterable)));
    }

    @Override // com.gengoai.stream.StreamingContext
    public SparkStream<String> textFile(String str) {
        return Strings.isNullOrBlank(str) ? empty() : new SparkStream<>(getSparkContext().textFile(str));
    }

    @Override // com.gengoai.stream.StreamingContext
    public SparkStream<String> textFile(Resource resource) {
        return resource == null ? empty() : textFile(resource.path());
    }

    @Override // com.gengoai.stream.StreamingContext
    public synchronized void updateConfig() {
        if (this.configBroadcast != null && this.configBroadcast.isValid()) {
            this.configBroadcast.destroy();
        }
        this.configBroadcast = broadcast(Config.getInstance());
    }

    @Override // com.gengoai.stream.StreamingContext
    public MStream<String> textFile(Resource resource, boolean z) {
        return z ? new SparkStream(getSparkContext().wholeTextFiles(resource.path()).values()) : textFile(resource);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1143869002:
                if (implMethodName.equals("lambda$pairStream$d479d350$1")) {
                    z = false;
                    break;
                }
                break;
            case -557250025:
                if (implMethodName.equals("lambda$doubleStream$54d1d491$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case ConfigScanner.YYINITIAL /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/gengoai/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/gengoai/stream/spark/SparkStreamingContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map$Entry;)Ljava/util/Map$Entry;")) {
                    return entry -> {
                        return entry;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/gengoai/function/SerializableToDoubleFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsDouble") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)D") && serializedLambda.getImplClass().equals("com/gengoai/stream/spark/SparkStreamingContext") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)D")) {
                    return obj -> {
                        return Double.NaN;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
