package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.Dataset;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.sources.CreatableRelationProvider;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/plan/KafkaRelationVisitor.class */
public class KafkaRelationVisitor<D extends OpenLineage.Dataset> extends QueryPlanVisitor<LogicalRelation, D> {
    private static final String KAFKA_SOURCE_PROVIDER_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaSourceProvider";
    private static final String KAFKA_RELATION_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaRelation";
    private final DatasetFactory<D> datasetFactory;
    private static final Logger log = LoggerFactory.getLogger(KafkaRelationVisitor.class);
    private static final AtomicBoolean KAFKA_PROVIDER_CLASS_PRESENT = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_PROVIDER_CHECKED = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_RELATION_CLASS_PRESENT = new AtomicBoolean(false);
    private static final AtomicBoolean KAFKA_RELATION_CHECKED = new AtomicBoolean(false);

    public KafkaRelationVisitor(OpenLineageContext openLineageContext, DatasetFactory<D> datasetFactory) {
        super(openLineageContext);
        this.datasetFactory = datasetFactory;
    }

    public static boolean hasKafkaClasses() {
        log.debug("Checking if Kafka classes are available");
        if (!KAFKA_PROVIDER_CHECKED.get()) {
            log.debug("Kafka classes have not been checked yet");
            synchronized (KafkaRelationVisitor.class) {
                if (!KAFKA_PROVIDER_CHECKED.get()) {
                    boolean z = checkWithCurrentClassClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME) || checkWithCurrentThreadContextClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME);
                    KAFKA_PROVIDER_CLASS_PRESENT.set(z);
                    KAFKA_PROVIDER_CHECKED.set(true);
                    log.debug("Kafka classes availability: " + z);
                }
            }
        }
        return KAFKA_PROVIDER_CLASS_PRESENT.get();
    }

    private static boolean hasKafkaRelationClass() {
        log.debug("Checking if KafkaRelation class is available");
        if (!KAFKA_RELATION_CHECKED.get()) {
            log.debug("KafkaRelation class has not been checked yet");
            synchronized (KafkaRelationVisitor.class) {
                if (!KAFKA_RELATION_CHECKED.get()) {
                    boolean z = checkWithCurrentClassClassLoader(KAFKA_RELATION_CLASS_NAME) || checkWithCurrentThreadContextClassLoader(KAFKA_RELATION_CLASS_NAME);
                    KAFKA_RELATION_CLASS_PRESENT.set(z);
                    KAFKA_RELATION_CHECKED.set(true);
                    log.debug("KafkaRelation class availability: " + z);
                }
            }
        }
        return KAFKA_RELATION_CLASS_PRESENT.get();
    }

    private static boolean checkWithCurrentClassClassLoader(String str) {
        try {
            KafkaRelationVisitor.class.getClassLoader().loadClass(str);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    private static boolean checkWithCurrentThreadContextClassLoader(String str) {
        return loadClassWithCurrentThreadContextClassLoader(str) != null;
    }

    @Nullable
    private static Class<?> loadClassWithCurrentThreadContextClassLoader(String str) {
        try {
            return Thread.currentThread().getContextClassLoader().loadClass(str);
        } catch (Exception e) {
            return null;
        }
    }

    public static boolean isKafkaSource(CreatableRelationProvider creatableRelationProvider) {
        log.debug("Checking if provider is KafkaSourceProvider");
        if (!hasKafkaClasses()) {
            log.debug("Kafka classes are not available to check whether provider is KafkaSourceProvider");
            return false;
        }
        try {
            log.debug("Checking if provider is KafkaSourceProvider");
            Class<?> loadClassWithCurrentThreadContextClassLoader = loadClassWithCurrentThreadContextClassLoader(KAFKA_SOURCE_PROVIDER_CLASS_NAME);
            if (loadClassWithCurrentThreadContextClassLoader == null) {
                return false;
            }
            return loadClassWithCurrentThreadContextClassLoader.isAssignableFrom(creatableRelationProvider.getClass());
        } catch (Exception e) {
            return false;
        }
    }

    public static <D extends OpenLineage.Dataset> List<D> createKafkaDatasets(DatasetFactory<D> datasetFactory, CreatableRelationProvider creatableRelationProvider, Map<String, String> map, SaveMode saveMode, StructType structType) {
        return createDatasetsFromOptions(datasetFactory, map, structType);
    }

    @Override // io.openlineage.spark.api.QueryPlanVisitor
    public boolean isDefinedAt(LogicalPlan logicalPlan) {
        Class<?> loadClassWithCurrentThreadContextClassLoader;
        if (!hasKafkaRelationClass() || !(logicalPlan instanceof LogicalRelation) || (loadClassWithCurrentThreadContextClassLoader = loadClassWithCurrentThreadContextClassLoader(KAFKA_RELATION_CLASS_NAME)) == null) {
            return false;
        }
        BaseRelation relation = ((LogicalRelation) logicalPlan).relation();
        if (log.isDebugEnabled()) {
            log.debug("Checking if {} is assignable from {}", loadClassWithCurrentThreadContextClassLoader.getCanonicalName(), relation.getClass().getCanonicalName());
        }
        return loadClassWithCurrentThreadContextClassLoader.isAssignableFrom(relation.getClass());
    }

    public List<D> apply(LogicalPlan logicalPlan) {
        Map empty;
        BaseRelation relation = ((LogicalRelation) logicalPlan).relation();
        try {
            Field declaredField = relation.getClass().getDeclaredField("sourceOptions");
            declaredField.setAccessible(true);
            empty = (Map) declaredField.get(relation);
        } catch (Exception e) {
            log.error("Can't extract kafka server options", e);
            empty = Map$.MODULE$.empty();
        }
        return createDatasetsFromOptions(this.datasetFactory, empty, relation.schema());
    }

    private static <D extends OpenLineage.Dataset> List<D> createDatasetsFromOptions(DatasetFactory<D> datasetFactory, Map<String, String> map, StructType structType) {
        Optional asJavaOptional = ScalaConversionUtils.asJavaOptional(map.get("kafka.bootstrap.servers"));
        Stream map2 = Stream.of((Object[]) new String[]{"subscribe", "topic"}).map(str -> {
            return map.get(str);
        }).filter(option -> {
            return option.nonEmpty();
        }).map(option2 -> {
            return (String) option2.get();
        });
        Class<String> cls = String.class;
        String.class.getClass();
        List list = (List) Stream.concat(map2.map((v1) -> {
            return r1.cast(v1);
        }), (Stream) ScalaConversionUtils.asJavaOptional(map.get("assign")).map(str2 -> {
            try {
                return StreamSupport.stream(Spliterators.spliterator(new ObjectMapper().readTree(str2).fieldNames(), r0.size(), 0), false);
            } catch (IOException e) {
                log.warn("Unable to find topics from Kafka source configuration {}", str2, e);
                return Stream.empty();
            }
        }).orElse(Stream.empty())).collect(Collectors.toList());
        String str3 = "kafka://" + ((String) asJavaOptional.map(str4 -> {
            return !str4.matches("\\w+://.*") ? "PLAINTEXT://" + str4 : str4;
        }).map(str5 -> {
            return URI.create(str5.split(",")[0]);
        }).map(uri -> {
            return uri.getHost() + ":" + uri.getPort();
        }).orElse(""));
        return (List) list.stream().map(str6 -> {
            return datasetFactory.getDataset(str6, str3, structType);
        }).collect(Collectors.toList());
    }
}
