package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCatalogSupport;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.PulsarValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.shade.io.netty.handler.codec.rtsp.RtspHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory.class */
public class PulsarTableSourceSinkFactory implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {
    private static final Logger log = LoggerFactory.getLogger(PulsarTableSourceSinkFactory.class);
    private Properties catalogProperties;
    private boolean isInPulsarCatalog;
    private boolean isInDDL;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory$StartupOptions.class */
    public static class StartupOptions {
        private StartupMode startupMode;
        private Map<String, MessageId> specificOffsets;
        private String externalSubscriptionName;

        private StartupOptions() {
        }
    }

    public PulsarTableSourceSinkFactory(Properties properties) {
        this.catalogProperties = properties;
        this.isInPulsarCatalog = properties.size() != 0;
        this.isInDDL = false;
    }

    public PulsarTableSourceSinkFactory() {
        this(new Properties());
    }

    public StreamTableSink<Row> createStreamTableSink(Map<String, String> map) {
        Properties pulsarProperties;
        DescriptorProperties validatedProperties = getValidatedProperties(map);
        TableSchema tableSchema = validatedProperties.getTableSchema("schema");
        String string = validatedProperties.getString(PulsarValidator.CONNECTOR_TOPIC);
        String string2 = validatedProperties.getString(PulsarValidator.CONNECTOR_SERVICE_URL);
        String string3 = validatedProperties.getString(PulsarValidator.CONNECTOR_ADMIN_URL);
        String property = this.isInPulsarCatalog ? this.catalogProperties.getProperty("format.type") : validatedProperties.getString("format.type");
        SchemaValidator.deriveProctimeAttribute(validatedProperties);
        SchemaValidator.deriveRowtimeAttributes(validatedProperties);
        if (this.isInPulsarCatalog) {
            pulsarProperties = new Properties();
            pulsarProperties.putAll(this.catalogProperties);
        } else {
            pulsarProperties = getPulsarProperties(validatedProperties);
        }
        pulsarProperties.put(PulsarValidator.CONNECTOR_TOPIC, string);
        pulsarProperties.put("format.type", property);
        Properties removeConnectorPrefix = removeConnectorPrefix(pulsarProperties);
        SerializationSchema<Row> serializationSchema = getSerializationSchema(map);
        log.info("stream table sink use {} to serialize data", serializationSchema);
        return new PulsarTableSink(string2, string3, tableSchema, string, removeConnectorPrefix, serializationSchema);
    }

    public TableSink<Row> createTableSink(TableSinkFactory.Context context) {
        HashMap hashMap = new HashMap(context.getTable().toProperties());
        if (!hashMap.containsKey(PulsarValidator.CONNECTOR_TOPIC)) {
            hashMap.put(PulsarValidator.CONNECTOR_TOPIC, PulsarCatalogSupport.objectPath2TopicName(context.getObjectIdentifier().toObjectPath()));
        }
        return createStreamTableSink(hashMap);
    }

    public StreamTableSource<Row> createStreamTableSource(Map<String, String> map) {
        Properties pulsarProperties;
        DescriptorProperties validatedProperties = getValidatedProperties(map);
        String string = validatedProperties.getString(PulsarValidator.CONNECTOR_TOPIC);
        String string2 = validatedProperties.getString(PulsarValidator.CONNECTOR_SERVICE_URL);
        String string3 = validatedProperties.getString(PulsarValidator.CONNECTOR_ADMIN_URL);
        StartupOptions startupOptions = getStartupOptions(validatedProperties);
        Optional empty = Optional.empty();
        if (this.isInPulsarCatalog || this.isInDDL) {
            empty = Optional.of(validatedProperties.getTableSchema("schema"));
        }
        if (this.isInPulsarCatalog) {
            pulsarProperties = new Properties();
            pulsarProperties.putAll(this.catalogProperties);
        } else {
            pulsarProperties = getPulsarProperties(validatedProperties);
        }
        pulsarProperties.put("connector.use-extend-field", ((Boolean) validatedProperties.getOptionalBoolean("connector.use-extend-field").orElse(false)).booleanValue() ? "true" : "false");
        pulsarProperties.put(PulsarValidator.CONNECTOR_TOPIC, string);
        Properties removeConnectorPrefix = removeConnectorPrefix(pulsarProperties);
        DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(map);
        Optional map2 = Optional.ofNullable(deserializationSchema).map((v0) -> {
            return v0.getProducedType();
        }).map(typeInformation -> {
            return SchemaValidator.deriveFieldMapping(validatedProperties, Optional.of(typeInformation));
        });
        log.info("stream table source use {} to deserialize data", deserializationSchema);
        return new PulsarTableSource(empty, SchemaValidator.deriveProctimeAttribute(validatedProperties), SchemaValidator.deriveRowtimeAttributes(validatedProperties), map2, string2, string3, removeConnectorPrefix, deserializationSchema, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.externalSubscriptionName);
    }

    public TableSource<Row> createTableSource(ObjectPath objectPath, CatalogTable catalogTable) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(catalogTable.toProperties());
        this.isInDDL = hashMap.size() != 0;
        if (hashMap.get(PulsarValidator.CONNECTOR_TOPIC) == null) {
            hashMap.put(PulsarValidator.CONNECTOR_TOPIC, PulsarCatalogSupport.objectPath2TopicName(objectPath));
        }
        return createStreamTableSource(hashMap);
    }

    private static Properties removeConnectorPrefix(Properties properties) {
        Properties properties2 = new Properties();
        for (Map.Entry entry : properties.entrySet()) {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            if (str.startsWith("connector.")) {
                properties2.put(str.substring("connector.".length()), str2);
            } else {
                properties2.put(str, str2);
            }
        }
        return properties2;
    }

    public Map<String, String> requiredContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("update-mode", RtspHeaders.Values.APPEND);
        hashMap.put("connector.type", "pulsar");
        hashMap.put("connector.property-version", "1");
        return hashMap;
    }

    public List<String> supportedProperties() {
        ArrayList arrayList = new ArrayList();
        arrayList.add("update-mode");
        arrayList.add("connector.version");
        arrayList.add(PulsarValidator.CONNECTOR_TOPIC);
        arrayList.add(PulsarValidator.CONNECTOR_SERVICE_URL);
        arrayList.add(PulsarValidator.CONNECTOR_ADMIN_URL);
        arrayList.add(PulsarValidator.CONNECTOR_STARTUP_MODE);
        arrayList.add("connector.specific-offsets.#.partition");
        arrayList.add("connector.specific-offsets.#.offset");
        arrayList.add("connector.properties.*");
        arrayList.add(PulsarValidator.CONNECTOR_EXTERNAL_SUB_NAME);
        arrayList.add("connector.use-extend-field");
        arrayList.add(PulsarValidator.CONNECTOR_PROPERTIES);
        arrayList.add("connector.properties.#.key");
        arrayList.add("connector.properties.#.value");
        arrayList.add(PulsarValidator.CONNECTOR_SINK_EXTRACTOR);
        arrayList.add(PulsarValidator.CONNECTOR_SINK_EXTRACTOR_CLASS);
        arrayList.add("schema.#.name");
        arrayList.add("schema.#.from");
        arrayList.add("schema.#.data-type");
        arrayList.add("schema.#.expr");
        arrayList.add("schema.#.proctime");
        arrayList.add("schema.#.rowtime.timestamps.type");
        arrayList.add("schema.#.rowtime.timestamps.from");
        arrayList.add("schema.#.rowtime.timestamps.class");
        arrayList.add("schema.#.rowtime.timestamps.serialized");
        arrayList.add("schema.#.rowtime.watermarks.type");
        arrayList.add("schema.#.rowtime.watermarks.class");
        arrayList.add("schema.#.rowtime.watermarks.serialized");
        arrayList.add("schema.#.rowtime.watermarks.delay");
        arrayList.add("schema.#.expr");
        arrayList.add("schema.watermark.#.rowtime");
        arrayList.add("schema.watermark.#.strategy.expr");
        arrayList.add("schema.watermark.#.strategy.data-type");
        arrayList.add("format.*");
        return arrayList;
    }

    private StartupOptions getStartupOptions(DescriptorProperties descriptorProperties) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList(1);
        StartupMode startupMode = (StartupMode) descriptorProperties.getOptionalString(PulsarValidator.CONNECTOR_STARTUP_MODE).map(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1109880953:
                    if (str.equals("latest")) {
                        z = true;
                        break;
                    }
                    break;
                case -809579181:
                    if (str.equals("earliest")) {
                        z = false;
                        break;
                    }
                    break;
                case -410146651:
                    if (str.equals("specific-offsets")) {
                        z = 2;
                        break;
                    }
                    break;
                case 2052905951:
                    if (str.equals("external-subscription")) {
                        z = 3;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return StartupMode.EARLIEST;
                case true:
                    return StartupMode.LATEST;
                case true:
                    descriptorProperties.getFixedIndexedProperties(PulsarValidator.CONNECTOR_SPECIFIC_OFFSETS, Arrays.asList("partition", PulsarValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET)).forEach(map -> {
                        try {
                            hashMap.put(descriptorProperties.getString((String) map.get("partition")), MessageId.fromByteArray(descriptorProperties.getString((String) map.get(PulsarValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET)).getBytes()));
                        } catch (IOException e) {
                            log.error("Failed to decode message id from properties {}", ExceptionUtils.stringifyException(e));
                            throw new RuntimeException(e);
                        }
                    });
                    return StartupMode.SPECIFIC_OFFSETS;
                case true:
                    arrayList.add(descriptorProperties.getString(PulsarValidator.CONNECTOR_EXTERNAL_SUB_NAME));
                    return StartupMode.EXTERNAL_SUBSCRIPTION;
                default:
                    throw new TableException("Unsupported startup mode. Validator should have checked that.");
            }
        }).orElse(StartupMode.LATEST);
        StartupOptions startupOptions = new StartupOptions();
        startupOptions.startupMode = startupMode;
        startupOptions.specificOffsets = hashMap;
        if (arrayList.size() != 0) {
            startupOptions.externalSubscriptionName = (String) arrayList.get(0);
        }
        return startupOptions;
    }

    private Properties getPulsarProperties(DescriptorProperties descriptorProperties) {
        Properties properties = new Properties();
        if (descriptorProperties.containsKey("connector.properties.0.key")) {
            descriptorProperties.getFixedIndexedProperties(PulsarValidator.CONNECTOR_PROPERTIES, Arrays.asList("key", PulsarValidator.CONNECTOR_PROPERTIES_VALUE)).forEach(map -> {
                properties.put(descriptorProperties.getString((String) map.get("key")), descriptorProperties.getString((String) map.get(PulsarValidator.CONNECTOR_PROPERTIES_VALUE)));
            });
        } else {
            descriptorProperties.asMap().keySet().stream().filter(str -> {
                return str.startsWith(PulsarValidator.CONNECTOR_PROPERTIES);
            }).forEach(str2 -> {
                properties.put(str2.substring("connector.properties.".length()), descriptorProperties.getString(str2));
            });
        }
        return properties;
    }

    private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema tableSchema) {
        Map deriveFieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(tableSchema.toRowType()));
        return (deriveFieldMapping.size() == tableSchema.getFieldNames().length && deriveFieldMapping.entrySet().stream().allMatch(entry -> {
            return ((String) entry.getKey()).equals(entry.getValue());
        })) ? false : true;
    }

    private DescriptorProperties getValidatedProperties(Map<String, String> map) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(map);
        new PulsarSchemaValidator(true, true, false).validate(descriptorProperties);
        new PulsarValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    private DeserializationSchema<Row> getDeserializationSchema(Map<String, String> map) {
        try {
            return TableFactoryService.find(DeserializationSchemaFactory.class, map, getClass().getClassLoader()).createDeserializationSchema(map);
        } catch (Exception e) {
            log.warn("get deserializer from properties failed. using pulsar inner schema instead.");
            return null;
        }
    }

    private SerializationSchema<Row> getSerializationSchema(Map<String, String> map) {
        try {
            return TableFactoryService.find(SerializationSchemaFactory.class, map, getClass().getClassLoader()).createSerializationSchema(map);
        } catch (Exception e) {
            log.warn("get deserializer from properties failed. using json schema instead.");
            return JsonSer.of(Row.class);
        }
    }
}
