package io.openlineage.spark.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.shaded.org.apache.commons.lang3.reflect.FieldUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/plan/KafkaMicroBatchStreamStrategy.class */
public final class KafkaMicroBatchStreamStrategy extends StreamStrategy {
    private static final Logger log = LoggerFactory.getLogger(KafkaMicroBatchStreamStrategy.class);
    private static final String KAFKA_SOURCE_OFFSET_CLASS_NAME = "org.apache.spark.sql.kafka010.KafkaSourceOffset";

    public KafkaMicroBatchStreamStrategy(DatasetFactory<OpenLineage.InputDataset> datasetFactory, StructType structType, SparkDataStream sparkDataStream, Optional<Offset> optional) {
        super(datasetFactory, structType, sparkDataStream, optional);
    }

    @Override // io.openlineage.spark.agent.lifecycle.plan.StreamStrategy
    public List<OpenLineage.InputDataset> getInputDatasets() {
        Optional<String> bootstrapServers = getBootstrapServers();
        Set<String> topics = getTopics();
        if (bootstrapServers.isPresent() && !topics.isEmpty()) {
            return generateInputDatasets(bootstrapServers, topics);
        }
        log.warn("Could not generate an input dataset because bootstrapServers need to be present and at least one topic must exist");
        return Collections.emptyList();
    }

    private Optional<String> getBootstrapServers() {
        return tryReadField(this.stream, "executorKafkaParams").map(map -> {
            return (String) map.get("bootstrap.servers");
        });
    }

    private Set<String> getTopics() {
        if (!this.offsetOption.isPresent()) {
            return Collections.emptySet();
        }
        Offset offset = this.offsetOption.get();
        String canonicalName = offset.getClass().getCanonicalName();
        if (KAFKA_SOURCE_OFFSET_CLASS_NAME.equals(canonicalName)) {
            return (Set) tryReadField(offset, "partitionToOffsets").map(ScalaConversionUtils::toJavaMap).map((v0) -> {
                return v0.keySet();
            }).map((v1) -> {
                return convertTopicPartitions(v1);
            }).orElseGet(Collections::emptySet);
        }
        log.warn("Encountered an unsupported offset class: {}", canonicalName);
        return Collections.emptySet();
    }

    private List<OpenLineage.InputDataset> generateInputDatasets(Optional<String> optional, Collection<String> collection) {
        String resolve = KafkaBootstrapServerResolver.resolve(optional);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(this.datasetFactory.getDataset(it.next(), resolve, this.schema));
        }
        return arrayList;
    }

    private <T> Optional<T> tryReadField(Object obj, String str) {
        try {
            return Optional.ofNullable(FieldUtils.readDeclaredField(obj, str, true));
        } catch (IllegalAccessException e) {
            log.error("Could not read the field '{}'", str, e);
            return Optional.empty();
        } catch (IllegalArgumentException e2) {
            log.error("Could not read the field '{}' because it does not exist", str, e2);
            return Optional.empty();
        }
    }

    private Set<String> convertTopicPartitions(Collection collection) {
        HashSet hashSet = new HashSet();
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            Optional<String> optional = new TopicPartitionProxy(it.next()).topic();
            Objects.requireNonNull(hashSet);
            optional.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return hashSet;
    }
}
