package ideal.sylph.plugins.kafka.spark;

import com.github.harbby.gadtry.base.Lazys;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.SourceContext;
import ideal.sylph.etl.api.Source;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

@Name("socket")
@Version("1.0.0")
@Description("this spark socket source inputStream")
/* loaded from: input_file:ideal/sylph/plugins/kafka/spark/SocketSource.class */
public class SocketSource implements Source<JavaDStream<Row>> {
    private static final long serialVersionUID = 1;
    private final transient Supplier<JavaDStream<Row>> loadStream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ideal/sylph/plugins/kafka/spark/SocketSource$SocketSourceConfig.class */
    public static class SocketSourceConfig extends PluginConfig {
        private static final long serialVersionUID = 2;

        @Name("socket_hosts")
        @Description("this is socket_hosts list")
        private String hosts = "localhost:9999";

        private SocketSourceConfig() {
        }
    }

    public SocketSource(JavaStreamingContext javaStreamingContext, SocketSourceConfig socketSourceConfig, SourceContext sourceContext) {
        this.loadStream = Lazys.goLazy(() -> {
            return createSource(javaStreamingContext, socketSourceConfig, sourceContext);
        });
    }

    public JavaDStream<Row> createSource(JavaStreamingContext javaStreamingContext, SocketSourceConfig socketSourceConfig, SourceContext sourceContext) {
        String str = (String) Objects.requireNonNull(socketSourceConfig.hosts, "socketLoad is not setting");
        StructType structType = new StructType(new StructField[]{new StructField("host", DataTypes.StringType, true, Metadata.empty()), new StructField("port", DataTypes.StringType, true, Metadata.empty()), new StructField("value", DataTypes.StringType, true, Metadata.empty())});
        return (JavaDStream) ((Set) Arrays.stream(str.split(",")).filter(str2 -> {
            return str2.contains(":");
        }).collect(Collectors.toSet())).stream().map(str3 -> {
            String[] split = str3.split(":");
            return javaStreamingContext.socketTextStream(split[0], Integer.parseInt(split[1])).map(str3 -> {
                return new GenericRowWithSchema(new Object[]{split[0], Integer.valueOf(Integer.parseInt(split[1])), str3}, structType);
            });
        }).reduce((v0, v1) -> {
            return v0.union(v1);
        }).orElseThrow(() -> {
            return new RuntimeException();
        });
    }

    /* renamed from: getSource, reason: merged with bridge method [inline-methods] */
    public JavaDStream<Row> m2getSource() {
        return this.loadStream.get();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -886282449:
                if (implMethodName.equals("lambda$new$82d5e9ac$1")) {
                    z = false;
                    break;
                }
                break;
            case 1081221070:
                if (implMethodName.equals("lambda$null$c3a04fd4$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/github/harbby/gadtry/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ideal/sylph/plugins/kafka/spark/SocketSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/streaming/api/java/JavaStreamingContext;Lideal/sylph/plugins/kafka/spark/SocketSource$SocketSourceConfig;Lideal/sylph/etl/SourceContext;)Lorg/apache/spark/streaming/api/java/JavaDStream;")) {
                    SocketSource socketSource = (SocketSource) serializedLambda.getCapturedArg(0);
                    JavaStreamingContext javaStreamingContext = (JavaStreamingContext) serializedLambda.getCapturedArg(1);
                    SocketSourceConfig socketSourceConfig = (SocketSourceConfig) serializedLambda.getCapturedArg(2);
                    SourceContext sourceContext = (SourceContext) serializedLambda.getCapturedArg(3);
                    return () -> {
                        return createSource(javaStreamingContext, socketSourceConfig, sourceContext);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("ideal/sylph/plugins/kafka/spark/SocketSource") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/String;Lorg/apache/spark/sql/types/StructType;Ljava/lang/String;)Lorg/apache/spark/sql/Row;")) {
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(0);
                    StructType structType = (StructType) serializedLambda.getCapturedArg(1);
                    return str3 -> {
                        return new GenericRowWithSchema(new Object[]{strArr[0], Integer.valueOf(Integer.parseInt(strArr[1])), str3}, structType);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
