/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar.internal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.connectors.pulsar.internal.IncompatibleSchemaException;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaTranslator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.pulsar.TableSchemaHelper;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;

public class PulsarCatalogSupport {
    private static final String FLINK_CATALOG_TENANT = "__flink_catalog";
    private static final String TABLE_PREFIX = "table_";
    private static final String TABLE_COMMENT = "table.comment";
    private static final String IS_CATALOG_TOPIC = "is.catalog.topic";
    private final Map<String, String> properties;
    private final PulsarMetadataReader pulsarMetadataReader;
    private SchemaTranslator schemaTranslator;
    private final ClientConfigurationData clientConf;

    public PulsarCatalogSupport(String adminUrl, Map<String, String> properties, String subscriptionName, Map<String, String> caseInsensitiveParams, int indexOfThisSubtask, int numParallelSubtasks, SchemaTranslator schemaTranslator) throws PulsarClientException, PulsarAdminException {
        this.properties = properties;
        this.clientConf = new ClientConfigurationData();
        this.clientConf.setAuthParams(properties.get("properties.auth-params"));
        this.clientConf.setAuthPluginClassName(properties.get("properties.auth-plugin-classname"));
        this.pulsarMetadataReader = new PulsarMetadataReader(adminUrl, this.clientConf, subscriptionName, caseInsensitiveParams, indexOfThisSubtask, numParallelSubtasks);
        this.schemaTranslator = schemaTranslator;
        if (!this.pulsarMetadataReader.tenantExists(FLINK_CATALOG_TENANT)) {
            this.pulsarMetadataReader.createTenant(FLINK_CATALOG_TENANT);
        }
    }

    @VisibleForTesting
    protected PulsarCatalogSupport(PulsarMetadataReader metadataReader, SchemaTranslator schemaTranslator) {
        this.pulsarMetadataReader = metadataReader;
        this.schemaTranslator = schemaTranslator;
        this.properties = new HashMap<String, String>();
        this.clientConf = new ClientConfigurationData();
    }

    public static boolean isNativeFlinkDatabase(String name) {
        return !name.contains("/");
    }

    public List<String> listDatabases() throws PulsarAdminException {
        ArrayList<String> databases = new ArrayList<String>();
        for (String ns : this.pulsarMetadataReader.listNamespaces()) {
            if (ns.startsWith(FLINK_CATALOG_TENANT)) {
                databases.add(ns.substring(FLINK_CATALOG_TENANT.length() + 1));
                continue;
            }
            databases.add(ns);
        }
        return databases;
    }

    public boolean databaseExists(String name) throws PulsarAdminException {
        if (PulsarCatalogSupport.isNativeFlinkDatabase(name)) {
            return this.pulsarMetadataReader.namespaceExists("__flink_catalog/" + name);
        }
        return this.pulsarMetadataReader.namespaceExists(name);
    }

    public void createDatabase(String name) throws PulsarAdminException {
        this.pulsarMetadataReader.createNamespace("__flink_catalog/" + name);
    }

    public List<String> listTables(String database) throws PulsarAdminException {
        if (PulsarCatalogSupport.isNativeFlinkDatabase(database)) {
            ArrayList<String> tables = new ArrayList<String>();
            List<String> topics = this.pulsarMetadataReader.getTopics("__flink_catalog/" + database);
            for (String topic : topics) {
                tables.add(topic.substring(TABLE_PREFIX.length()));
            }
            return tables;
        }
        return this.pulsarMetadataReader.getTopics(database);
    }

    public boolean tableExists(ObjectPath tablePath) throws PulsarAdminException {
        String topicName = this.objectNameToTopicName(tablePath);
        return this.pulsarMetadataReader.topicExists(topicName);
    }

    public CatalogTable getTable(ObjectPath tablePath) throws PulsarAdminException, IncompatibleSchemaException {
        String topicName = this.objectNameToTopicName(tablePath);
        if (PulsarCatalogSupport.isNativeFlinkDatabase(tablePath.getDatabaseName())) {
            SchemaInfo metadataSchema = this.pulsarMetadataReader.getPulsarSchema(topicName);
            try {
                return TableSchemaHelper.deserialize(metadataSchema, this.generateDefaultTableOptions());
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new CatalogException("Failed to fetch metadata for flink table: " + tablePath.getObjectName());
            }
        }
        SchemaInfo pulsarSchema = this.pulsarMetadataReader.getPulsarSchema(topicName);
        return this.schemaToCatalogTable(pulsarSchema, tablePath, this.properties);
    }

    private Map<String, String> generateDefaultTableOptions() {
        HashMap<String, String> defaultTableOptions = new HashMap<String, String>();
        defaultTableOptions.put("service-url", this.properties.get("service-url"));
        defaultTableOptions.put("admin-url", this.properties.get("admin-url"));
        if (this.properties.get("auth-params") != null) {
            defaultTableOptions.put("auth-params", this.properties.get("auth-params"));
        }
        if (this.properties.get("auth-plugin-classname") != null) {
            defaultTableOptions.put("auth-plugin-classname", this.properties.get("auth-plugin-classname"));
        }
        return defaultTableOptions;
    }

    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws PulsarAdminException {
        if (!PulsarCatalogSupport.isNativeFlinkDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException("Can't delete normal pulsar topic");
        }
        String topicName = this.objectNameToTopicName(tablePath);
        this.pulsarMetadataReader.deleteSchema(topicName);
        this.pulsarMetadataReader.deleteTopic(topicName);
    }

    public void createTable(ObjectPath tablePath, CatalogBaseTable table) throws PulsarAdminException, IncompatibleSchemaException {
        if (!PulsarCatalogSupport.isNativeFlinkDatabase(tablePath.getDatabaseName())) {
            throw new CatalogException(String.format("Can't create flink table under pulsar tenant/namespace: %s", tablePath.getDatabaseName()));
        }
        String topicName = this.objectNameToTopicName(tablePath);
        this.pulsarMetadataReader.createTopic(topicName, 0);
        try {
            this.pulsarMetadataReader.uploadSchema(topicName, TableSchemaHelper.serialize(table));
        }
        catch (Exception e) {
            try {
                this.pulsarMetadataReader.deleteTopic(topicName);
            }
            catch (PulsarAdminException pulsarAdminException) {
                // empty catch block
            }
            e.printStackTrace();
            throw new CatalogException("Can't store table metadata");
        }
    }

    private CatalogTable schemaToCatalogTable(SchemaInfo pulsarSchema, ObjectPath tablePath, Map<String, String> flinkProperties) throws IncompatibleSchemaException {
        boolean isCatalogTopic = Boolean.parseBoolean(pulsarSchema.getProperties().get(IS_CATALOG_TOPIC));
        if (isCatalogTopic) {
            Map<String, String> properties = new HashMap();
            DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
            tableSchemaProps.putProperties(properties);
            TableSchema tableSchema = tableSchemaProps.getOptionalTableSchema("schema").orElseGet(() -> (TableSchema)tableSchemaProps.getOptionalTableSchema("generic.table.schema").orElseThrow(() -> new CatalogException("Failed to get table schema from properties for generic table " + tablePath)));
            List partitionKeys = tableSchemaProps.getPartitionKeys();
            properties = CatalogTableImpl.removeRedundant(properties, (TableSchema)tableSchema, (List)partitionKeys);
            properties.putAll(flinkProperties);
            properties.remove(IS_CATALOG_TOPIC);
            String comment = (String)properties.remove(TABLE_COMMENT);
            return new CatalogTableImpl(tableSchema, properties, comment);
        }
        TableSchema tableSchema = this.schemaTranslator.pulsarSchemaToTableSchema(pulsarSchema);
        return new CatalogTableImpl(tableSchema, flinkProperties, "");
    }

    private String objectNameToTopicName(ObjectPath objectPath) {
        String topic;
        String database;
        if (PulsarCatalogSupport.isNativeFlinkDatabase(objectPath.getDatabaseName())) {
            database = "__flink_catalog/" + objectPath.getDatabaseName();
            topic = TABLE_PREFIX + objectPath.getObjectName();
        } else {
            database = objectPath.getDatabaseName();
            topic = objectPath.getObjectName();
        }
        NamespaceName ns = NamespaceName.get(database);
        TopicName fullName = TopicName.get(TopicDomain.persistent.toString(), ns, topic);
        return fullName.toString();
    }

    public void close() {
        if (this.pulsarMetadataReader != null) {
            this.pulsarMetadataReader.close();
        }
    }

    public void deleteNamespace(String name) throws PulsarAdminException {
        this.pulsarMetadataReader.deleteNamespace(name);
    }
}

