package io.openlineage.spark3.agent.lifecycle.plan.catalog;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.filesystem.FilesystemDatasetUtils;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.SparkConfUtils;
import io.openlineage.spark.api.OpenLineageContext;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.class */
public class IcebergHandler implements CatalogHandler {
    private static final Logger log = LoggerFactory.getLogger(IcebergHandler.class);
    private static final String ICEBERG_PATH_IDENTIFIER_CLASS_NAME = "org.apache.iceberg.spark.PathIdentifier";
    private final OpenLineageContext context;
    private static final String TYPE = "type";
    private static final String CATALOG_IMPL = "catalog-impl";

    public IcebergHandler(OpenLineageContext openLineageContext) {
        this.context = openLineageContext;
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public boolean hasClasses() {
        try {
            IcebergHandler.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
            return true;
        } catch (Exception e) {
            log.debug("The iceberg catalog is not present");
            return false;
        }
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public boolean isClass(TableCatalog tableCatalog) {
        return (tableCatalog instanceof SparkCatalog) || (tableCatalog instanceof SparkSessionCatalog);
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public DatasetIdentifier getDatasetIdentifier(SparkSession sparkSession, TableCatalog tableCatalog, Identifier identifier, Map<String, String> map) {
        String name = tableCatalog.name();
        Map<String, String> catalogProperties = getCatalogProperties(ScalaConversionUtils.fromMap(sparkSession.conf().getAll()), name);
        String catalogType = getCatalogType(catalogProperties);
        String str = catalogProperties.get("warehouse");
        boolean equals = "default_iceberg".equals(name);
        boolean z = str == null || str.trim().isEmpty();
        boolean equals2 = ICEBERG_PATH_IDENTIFIER_CLASS_NAME.equals(identifier.getClass().getName());
        Optional<Table> icebergTable = getIcebergTable(tableCatalog, identifier);
        Optional map2 = icebergTable.map(table -> {
            return new Path(table.location());
        });
        Optional<DatasetIdentifier> empty = Optional.empty();
        if (equals && z && equals2) {
            if (log.isDebugEnabled()) {
                log.debug("Encountered an Iceberg-formatted dataset ({}) that does not belong to the configured Iceberg catalog (catalog={})", identifierToString(identifier), name);
            }
            map2 = icebergTable.map(table2 -> {
                return new Path(table2.location());
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Encountered an Iceberg-formatted dataset ({}) that belongs to the configured Iceberg catalog (catalog={})", identifierToString(identifier), name);
            }
            empty = getSymlinkIdentifier(sparkSession, catalogType, catalogProperties, identifier.toString());
        }
        DatasetIdentifier fromPath = PathUtils.fromPath((Path) map2.orElseGet(() -> {
            return reconstructDefaultLocation(new Path(str), identifier);
        }));
        empty.ifPresent(datasetIdentifier -> {
            fromPath.withSymlink(datasetIdentifier.getName(), datasetIdentifier.getNamespace(), DatasetIdentifier.SymlinkType.TABLE);
        });
        return fromPath;
    }

    private String identifierToString(Identifier identifier) {
        Class<?> cls = identifier.getClass();
        String[] namespace = identifier.namespace();
        return String.format("%s(namespace=%s; name=%s)", cls.getSimpleName(), namespace.length > 1 ? Arrays.toString(namespace) : namespace[0], identifier.name());
    }

    private Optional<DatasetIdentifier> getSymlinkIdentifier(SparkSession sparkSession, String str, Map<String, String> map, String str2) {
        DatasetIdentifier fromLocationAndName;
        String str3 = map.get("uri");
        if ("hive".equals(str)) {
            log.debug("Getting symlink for hive");
            fromLocationAndName = getHiveIdentifier(sparkSession, str3, str2);
        } else if ("rest".equals(str)) {
            log.debug("Getting symlink for rest");
            fromLocationAndName = getRestIdentifier(str3, str2);
        } else if ("nessie".equals(str)) {
            log.debug("Getting symlink for nessie");
            fromLocationAndName = getNessieIdentifier(str3, str2);
        } else if ("glue".equals(str)) {
            log.debug("Getting symlink for glue");
            fromLocationAndName = getGlueIdentifier(str2, sparkSession);
        } else {
            log.debug("Getting symlink using warehouse location and table name");
            fromLocationAndName = FilesystemDatasetUtils.fromLocationAndName(new Path(map.get("warehouse")).toUri(), str2);
        }
        return Optional.ofNullable(fromLocationAndName);
    }

    private void logMap(String str, Map<String, String> map) {
        if (log.isTraceEnabled()) {
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                arrayList.add(entry.getKey() + ": " + entry.getValue());
            }
            arrayList.sort(Comparator.naturalOrder());
            StringJoiner stringJoiner = new StringJoiner("\n\t", "\t", "");
            Objects.requireNonNull(stringJoiner);
            arrayList.forEach((v1) -> {
                r1.add(v1);
            });
            log.trace("{}\n{}", str, stringJoiner);
        }
    }

    private Map<String, String> getCatalogProperties(Map<String, String> map, String str) {
        String format = String.format("spark.sql.catalog.%s.", str);
        log.debug("Searching for spark properties pertaining to the catalog '{}'. The catalog settings are prefixed with '{}'.", str, format);
        logMap("The spark properties are:", map);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            if (key.startsWith(format)) {
                hashMap.put(key.substring(format.length()), entry.getValue());
            }
        }
        logMap("That catalog properties are:", hashMap);
        return hashMap;
    }

    private DatasetIdentifier getHiveIdentifier(SparkSession sparkSession, @Nullable String str, String str2) {
        return new DatasetIdentifier(str2, PathUtils.prepareHiveUri(str == null ? SparkConfUtils.getMetastoreUri(sparkSession.sparkContext()).orElseThrow(() -> {
            return new UnsupportedCatalogException("hive");
        }) : new URI(str)).toString());
    }

    private DatasetIdentifier getNessieIdentifier(@Nullable String str, String str2) {
        return new DatasetIdentifier(str2, new URI(str).toString());
    }

    private DatasetIdentifier getGlueIdentifier(String str, SparkSession sparkSession) {
        SparkContext sparkContext = sparkSession.sparkContext();
        return new DatasetIdentifier(PathUtils.GLUE_TABLE_PREFIX + str.replace(".", "/"), PathUtils.getGlueArn(sparkContext.getConf(), sparkContext.hadoopConfiguration()).get());
    }

    private DatasetIdentifier getRestIdentifier(@Nullable String str, String str2) {
        return new DatasetIdentifier(str2, new URI(str).toString());
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public Optional<OpenLineage.StorageDatasetFacet> getStorageDatasetFacet(Map<String, String> map) {
        return Optional.of(this.context.getOpenLineage().newStorageDatasetFacet("iceberg", map.getOrDefault("format", "").replace("iceberg/", "")));
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public Optional<String> getDatasetVersion(TableCatalog tableCatalog, Identifier identifier, Map<String, String> map) {
        return getIcebergTable(tableCatalog, identifier).map(table -> {
            return table.currentSnapshot();
        }).map(snapshot -> {
            return Long.toString(snapshot.snapshotId());
        });
    }

    private Optional<Table> getIcebergTable(TableCatalog tableCatalog, Identifier identifier) {
        try {
            if (tableCatalog instanceof SparkCatalog) {
                return Optional.ofNullable(((SparkCatalog) tableCatalog).loadTable(identifier).table());
            }
            return Optional.ofNullable(((SparkSessionCatalog) tableCatalog).icebergCatalog().loadTable(TableIdentifier.parse(identifier.toString())));
        } catch (NoSuchTableException | ClassCastException e) {
            log.error("Failed to load table from catalog: {}", identifier, e);
            return Optional.empty();
        }
    }

    private Path reconstructDefaultLocation(Path path, Identifier identifier) {
        String[] namespace = identifier.namespace();
        ArrayList arrayList = new ArrayList(namespace.length + 1);
        arrayList.addAll(Arrays.asList(namespace));
        arrayList.add(identifier.name());
        return new Path(path, String.join("/", arrayList));
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public String getName() {
        return "iceberg";
    }

    private String getCatalogType(Map<String, String> map) {
        if (map.containsKey(TYPE)) {
            String str = map.get(TYPE);
            log.debug("Found the catalog type using the 'type' property. The catalog type is '{}'", str);
            return str;
        }
        if (!map.containsKey(CATALOG_IMPL) || !map.get(CATALOG_IMPL).endsWith("GlueCatalog")) {
            return null;
        }
        log.debug("Default the catalog type to 'glue' because the catalog impl is {}", map.get(CATALOG_IMPL));
        return "glue";
    }
}
