package ideal.sylph.plugins.kafka.spark.util;

import ideal.sylph.etl.api.Sink;
import java.lang.invoke.SerializedLambda;
import org.apache.spark.Dependency;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ideal/sylph/plugins/kafka/spark/util/DStreamUtil.class */
public class DStreamUtil {
    private static final Logger logger = LoggerFactory.getLogger(DStreamUtil.class);

    private DStreamUtil() {
    }

    public static DStream<?> getFristDStream(DStream<?> dStream) {
        return dStream.dependencies().isEmpty() ? dStream : getFristDStream((DStream) dStream.dependencies().head());
    }

    public static RDD<?> getFristRdd(RDD<?> rdd) {
        return rdd.dependencies().isEmpty() ? rdd : getFristRdd(((Dependency) rdd.dependencies().head()).rdd());
    }

    public static void dstreamAction(JavaDStream<Row> javaDStream, Sink<JavaRDD<Row>> sink) {
        DStream<?> fristDStream = getFristDStream(javaDStream.dstream());
        logger.info("数据源驱动:{}", fristDStream.getClass().getName());
        if ("DirectKafkaInputDStream".equals(fristDStream.getClass().getSimpleName())) {
            logger.info("发现job 数据源是kafka,将开启空job优化 且 自动上报offect");
            javaDStream.foreachRDD(javaRDD -> {
                HasOffsetRanges fristRdd = getFristRdd(javaRDD.rdd());
                OffsetRange[] offsetRanges = fristRdd.offsetRanges();
                if (fristRdd.count() > 0) {
                    sink.run(javaRDD);
                }
                ((CanCommitOffsets) fristDStream).commitAsync(offsetRanges);
            });
        } else {
            sink.getClass();
            javaDStream.foreachRDD((v1) -> {
                r1.run(v1);
            });
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 113291:
                if (implMethodName.equals("run")) {
                    z = true;
                    break;
                }
                break;
            case 403218009:
                if (implMethodName.equals("lambda$dstreamAction$6569896$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("ideal/sylph/plugins/kafka/spark/util/DStreamUtil") && serializedLambda.getImplMethodSignature().equals("(Lideal/sylph/etl/api/Sink;Lorg/apache/spark/streaming/dstream/DStream;Lorg/apache/spark/api/java/JavaRDD;)V")) {
                    Sink sink = (Sink) serializedLambda.getCapturedArg(0);
                    DStream dStream = (DStream) serializedLambda.getCapturedArg(1);
                    return javaRDD -> {
                        HasOffsetRanges fristRdd = getFristRdd(javaRDD.rdd());
                        OffsetRange[] offsetRanges = fristRdd.offsetRanges();
                        if (fristRdd.count() > 0) {
                            sink.run(javaRDD);
                        }
                        ((CanCommitOffsets) dStream).commitAsync(offsetRanges);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("ideal/sylph/etl/api/Sink") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)V")) {
                    Sink sink2 = (Sink) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.run(v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
