package ideal.sylph.plugins.kafka.spark.structured;

import com.github.harbby.gadtry.collection.mutable.MutableList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ideal/sylph/plugins/kafka/spark/structured/KafkaSourceUtil.class */
public class KafkaSourceUtil {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSourceUtil.class);
    private static final List<String> filterKeys = MutableList.of(new String[]{"kafka_group_id", "group.id", "key.deserializer", "value.deserializer", "key.serializer", "value.serializer", "enable.auto.commit", "interceptor.classes"});

    private KafkaSourceUtil() {
    }

    private static Map<String, String> configParser(Map<String, Object> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            if (filterKeys.contains(entry.getKey())) {
                logger.warn("spark结构化流引擎 忽略参数:key[{}] value[{}]", entry.getKey(), entry.getValue());
                return false;
            }
            if (entry.getValue() != null) {
                return true;
            }
            logger.warn("spark结构化流引擎 忽略value null参数:key[{}] value[null]", entry.getKey());
            return false;
        }).collect(Collectors.toMap(entry2 -> {
            String str = (String) entry2.getKey();
            boolean z = -1;
            switch (str.hashCode()) {
                case -1690515694:
                    if (str.equals("kafka_broker")) {
                        z = true;
                        break;
                    }
                    break;
                case -37997866:
                    if (str.equals("kafka_topic")) {
                        z = false;
                        break;
                    }
                    break;
                case 208582163:
                    if (str.equals("auto.offset.reset")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return "subscribe";
                case true:
                    return "kafka.bootstrap.servers";
                case true:
                    return "startingOffsets";
                default:
                    return (String) entry2.getKey();
            }
        }, entry3 -> {
            return entry3.getValue().toString();
        }));
    }

    public static Dataset<Row> getSource(SparkSession sparkSession, Map<String, Object> map) {
        return sparkSession.readStream().format("kafka").options(configParser(map)).load();
    }
}
