package org.apache.flink.streaming.connectors.pulsar.table;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava18.com.google.common.base.CaseFormat;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.apache.pulsar.shade.org.glassfish.hk2.utilities.BuilderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions.class */
public class PulsarTableOptions {
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest";
    public static final String SCAN_STARTUP_MODE_VALUE_EXTERNAL_SUBSCRIPTION = "external-subscription";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    public static final String SINK_SEMANTIC_VALUE_NONE = "none";
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";
    private static final Logger log = LoggerFactory.getLogger(PulsarTableOptions.class);
    public static final ConfigOption<String> KEY_FORMAT = ConfigOptions.key("key.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding key data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key("value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<List<String>> KEY_FIELDS = ConfigOptions.key("key.fields").stringType().asList().defaultValues(new String[0]).withDescription("Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined.");
    public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE = ConfigOptions.key("value.fields-include").enumType(ValueFieldsStrategy.class).defaultValue(ValueFieldsStrategy.ALL).withDescription("Defines a strategy how to deal with key columns in the data type of the value format. By default, '" + ValueFieldsStrategy.ALL + "' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.");
    public static final ConfigOption<String> KEY_FIELDS_PREFIX = ConfigOptions.key("key.fields-prefix").stringType().noDefaultValue().withDescription("Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and '" + KEY_FIELDS.key() + "' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that '" + VALUE_FIELDS_INCLUDE.key() + "' must be '" + ValueFieldsStrategy.EXCEPT_KEY + "'.");
    public static final ConfigOption<String> SERVICE_URL = ConfigOptions.key("service-url").stringType().noDefaultValue().withDescription("Required pulsar server connection string");
    public static final ConfigOption<String> ADMIN_URL = ConfigOptions.key("admin-url").stringType().noDefaultValue().withDescription("Required pulsar admin connection string");
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key("topic").stringType().asList().noDefaultValue().withDescription("Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. Option 'topic' is required for sink.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key("topic-pattern").stringType().noDefaultValue().withDescription("Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key("scan.startup.mode").stringType().defaultValue("latest").withDescription("Optional startup mode for Pulsar consumer, valid enumerations are \"earliest\", \"latest\", \"external-subscription\",\nor \"specific-offsets\"");
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<String> SCAN_STARTUP_SUB_NAME = ConfigOptions.key("scan.startup.sub-name").stringType().noDefaultValue().withDescription("Optional sub-name used in case of \"external-subscription\" startup mode");
    public static final ConfigOption<String> SCAN_STARTUP_SUB_START_OFFSET = ConfigOptions.key("scan.startup.sub-start-offset").stringType().defaultValue("latest").withDescription("Optional sub-start-offset used in case of \"external-subscription\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key("scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<Long> PARTITION_DISCOVERY_INTERVAL_MILLIS = ConfigOptions.key(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY).longType().noDefaultValue().withDescription("Optional discovery topic interval of \"interval-millis\" millis");
    public static final ConfigOption<String> SINK_MESSAGE_ROUTER = ConfigOptions.key("sink.message-router").stringType().noDefaultValue().withDescription("Optional output MessageRouter \ninto pulsar's partitions valid enumerations are\n\"key-hash\": (each Flink partition ends up in at most one pulsar partition by key'hash, must set key for message),\n\"round-robin\": (a Flink partition is distributed to pulsar partitions round-robin, it's default messageRouter in pulsar)\n\"custom class name\": (use a custom MessageRouter subclass)");
    public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once";
    public static final ConfigOption<String> SINK_SEMANTIC = ConfigOptions.key("sink.semantic").stringType().defaultValue(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE).withDescription("Optional semantic when commit. Valid enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
    public static final ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key(TypedMessageBuilder.CONF_PROPERTIES).mapType().defaultValue(Collections.emptyMap()).withDescription("Optional pulsar config.");
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet(Arrays.asList("earliest", "latest", "external-subscription", "specific-offsets"));
    public static final String SINK_MESSAGE_ROUTER_VALUE_KEY_HASH = "key-hash";
    public static final String SINK_MESSAGE_ROUTER_VALUE_ROUND_ROBIN = "round-robin";
    private static final Set<String> SINK_MESSAGE_ROUTER_ENUMS = new HashSet(Arrays.asList(SINK_MESSAGE_ROUTER_VALUE_KEY_HASH, SINK_MESSAGE_ROUTER_VALUE_ROUND_ROBIN));
    public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once";
    private static final Set<String> SINK_SEMANTIC_ENUMS = new HashSet(Arrays.asList(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE, SINK_SEMANTIC_VALUE_EXACTLY_ONCE, "none"));

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions$StartupOptions.class */
    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<String, MessageId> specificOffsets = new HashMap();
        public String externalSubscriptionName;
        public String externalSubStartOffset;

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StartupOptions)) {
                return false;
            }
            StartupOptions startupOptions = (StartupOptions) obj;
            if (!startupOptions.canEqual(this)) {
                return false;
            }
            StartupMode startupMode = this.startupMode;
            StartupMode startupMode2 = startupOptions.startupMode;
            if (startupMode == null) {
                if (startupMode2 != null) {
                    return false;
                }
            } else if (!startupMode.equals(startupMode2)) {
                return false;
            }
            Map<String, MessageId> map = this.specificOffsets;
            Map<String, MessageId> map2 = startupOptions.specificOffsets;
            if (map == null) {
                if (map2 != null) {
                    return false;
                }
            } else if (!map.equals(map2)) {
                return false;
            }
            String str = this.externalSubscriptionName;
            String str2 = startupOptions.externalSubscriptionName;
            if (str == null) {
                if (str2 != null) {
                    return false;
                }
            } else if (!str.equals(str2)) {
                return false;
            }
            String str3 = this.externalSubStartOffset;
            String str4 = startupOptions.externalSubStartOffset;
            return str3 == null ? str4 == null : str3.equals(str4);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof StartupOptions;
        }

        public int hashCode() {
            StartupMode startupMode = this.startupMode;
            int hashCode = (1 * 59) + (startupMode == null ? 43 : startupMode.hashCode());
            Map<String, MessageId> map = this.specificOffsets;
            int hashCode2 = (hashCode * 59) + (map == null ? 43 : map.hashCode());
            String str = this.externalSubscriptionName;
            int hashCode3 = (hashCode2 * 59) + (str == null ? 43 : str.hashCode());
            String str2 = this.externalSubStartOffset;
            return (hashCode3 * 59) + (str2 == null ? 43 : str2.hashCode());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/table/PulsarTableOptions$ValueFieldsStrategy.class */
    public enum ValueFieldsStrategy {
        ALL,
        EXCEPT_KEY
    }

    public static void validateTableSourceOptions(ReadableConfig readableConfig) {
        validateSourceTopic(readableConfig);
        validateScanStartupMode(readableConfig);
    }

    public static void validateSourceTopic(ReadableConfig readableConfig) {
        Optional optional = readableConfig.getOptional(TOPIC);
        Optional optional2 = readableConfig.getOptional(TOPIC_PATTERN);
        if (optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!optional.isPresent() && !optional2.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    private static void validateScanStartupMode(ReadableConfig readableConfig) {
        readableConfig.getOptional(SCAN_STARTUP_MODE).map((v0) -> {
            return v0.toLowerCase();
        }).ifPresent(str -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(str)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest, latest, specific-offsets, external-subscription]", str));
            }
            if (str.equals("external-subscription") && !readableConfig.getOptional(SCAN_STARTUP_SUB_NAME).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SUB_NAME.key(), SCAN_STARTUP_SUB_NAME));
            }
            if (str.equals("specific-offsets")) {
                if (!readableConfig.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "specific-offsets"));
                }
                parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    public static void validateTableSinkOptions(ReadableConfig readableConfig) {
        validateSinkTopic(readableConfig);
        validateSinkSemantic(readableConfig);
    }

    public static void validateSinkMessageRouter(ReadableConfig readableConfig) {
        readableConfig.getOptional(SINK_MESSAGE_ROUTER).ifPresent(str -> {
            if (!SINK_MESSAGE_ROUTER_ENUMS.contains(str.toLowerCase()) && str.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_MESSAGE_ROUTER.key()));
            }
        });
    }

    public static void validateSinkTopic(ReadableConfig readableConfig) {
        if (isSingleTopic(readableConfig)) {
            return;
        }
        if (!readableConfig.getOptional(TOPIC_PATTERN).isPresent()) {
            throw new ValidationException(String.format("Flink Pulsar sink currently only supports single topic, but got %s: %s.", "'topic'", readableConfig.get(TOPIC)));
        }
        throw new ValidationException(String.format("Flink Pulsar sink currently only supports single topic, but got %s: %s.", "'topic-pattern'", readableConfig.get(TOPIC_PATTERN)));
    }

    private static void validateSinkSemantic(ReadableConfig readableConfig) {
        readableConfig.getOptional(SINK_SEMANTIC).ifPresent(str -> {
            if (!SINK_SEMANTIC_ENUMS.contains(str)) {
                throw new ValidationException(String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].", str, SINK_SEMANTIC.key()));
            }
        });
    }

    private static boolean isSingleTopic(ReadableConfig readableConfig) {
        return ((Boolean) readableConfig.getOptional(TOPIC).map(list -> {
            return Boolean.valueOf(list.size() == 1);
        }).orElse(false)).booleanValue();
    }

    public static PulsarSinkSemantic getSinkSemantic(ReadableConfig readableConfig) {
        String str = (String) readableConfig.get(SINK_SEMANTIC);
        boolean z = -1;
        switch (str.hashCode()) {
            case -286864670:
                if (str.equals(SINK_SEMANTIC_VALUE_EXACTLY_ONCE)) {
                    z = false;
                    break;
                }
                break;
            case 3387192:
                if (str.equals("none")) {
                    z = 2;
                    break;
                }
                break;
            case 2125618495:
                if (str.equals(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return PulsarSinkSemantic.EXACTLY_ONCE;
            case true:
                return PulsarSinkSemantic.AT_LEAST_ONCE;
            case true:
                return PulsarSinkSemantic.NONE;
            default:
                throw new TableException("Validator should have checked that");
        }
    }

    public static Properties getPulsarProperties(Map<String, String> map) {
        Properties properties = new Properties();
        if (hasPulsarClientProperties(map)) {
            map.keySet().stream().filter(str -> {
                return str.startsWith(PROPERTIES_PREFIX);
            }).forEach(str2 -> {
                String str2 = (String) map.get(str2);
                String substring = str2.substring(PROPERTIES_PREFIX.length());
                if (substring.startsWith(PulsarOptions.PULSAR_OPTION_KEY_PREFIX)) {
                    substring = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, substring);
                }
                properties.put(substring, str2);
            });
        }
        String key = PARTITION_DISCOVERY_INTERVAL_MILLIS.key();
        map.getClass();
        properties.computeIfAbsent(key, map::get);
        return properties;
    }

    public static Optional<MessageRouter> getMessageRouter(ReadableConfig readableConfig, ClassLoader classLoader) {
        return readableConfig.getOptional(SINK_MESSAGE_ROUTER).flatMap(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1662301013:
                    if (str.equals(SINK_MESSAGE_ROUTER_VALUE_ROUND_ROBIN)) {
                        z = true;
                        break;
                    }
                    break;
                case 454584252:
                    if (str.equals(SINK_MESSAGE_ROUTER_VALUE_KEY_HASH)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return Optional.of(KeyHashMessageRouterImpl.INSTANCE);
                case true:
                    return Optional.empty();
                default:
                    return Optional.of(initializeMessageRouter(str, classLoader));
            }
        });
    }

    private static MessageRouter initializeMessageRouter(String str, ClassLoader classLoader) {
        try {
            if (MessageRouter.class.isAssignableFrom(Class.forName(str, true, classLoader))) {
                return (MessageRouter) InstantiationUtil.instantiate(str, MessageRouter.class, classLoader);
            }
            throw new ValidationException(String.format("Sink messageRouter class '%s' should extend from the required class %s", str, MessageRouter.class.getName()));
        } catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate messageRouter class '%s'", str), e);
        }
    }

    private static boolean hasPulsarClientProperties(Map<String, String> map) {
        return map.keySet().stream().anyMatch(str -> {
            return str.startsWith(PROPERTIES_PREFIX);
        });
    }

    public static StartupOptions getStartupOptions(ReadableConfig readableConfig) {
        StartupOptions startupOptions = new StartupOptions();
        HashMap hashMap = new HashMap();
        Optional optional = readableConfig.getOptional(SCAN_STARTUP_MODE);
        if (optional.isPresent()) {
            String str = (String) optional.get();
            boolean z = -1;
            switch (str.hashCode()) {
                case -1109880953:
                    if (str.equals("latest")) {
                        z = true;
                        break;
                    }
                    break;
                case -809579181:
                    if (str.equals("earliest")) {
                        z = false;
                        break;
                    }
                    break;
                case -410146651:
                    if (str.equals("specific-offsets")) {
                        z = 2;
                        break;
                    }
                    break;
                case 2052905951:
                    if (str.equals("external-subscription")) {
                        z = 3;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    startupOptions.startupMode = StartupMode.EARLIEST;
                    break;
                case true:
                    startupOptions.startupMode = StartupMode.LATEST;
                    break;
                case true:
                    parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key()).forEach((num, str2) -> {
                        try {
                            hashMap.put(num.toString(), parseMessageId(str2));
                        } catch (Exception e) {
                            log.error("Failed to decode message id from properties {}", ExceptionUtils.stringifyException(e));
                            throw new RuntimeException(e);
                        }
                    });
                    startupOptions.startupMode = StartupMode.SPECIFIC_OFFSETS;
                    startupOptions.specificOffsets = hashMap;
                    break;
                case true:
                    startupOptions.externalSubscriptionName = (String) readableConfig.get(SCAN_STARTUP_SUB_NAME);
                    startupOptions.externalSubStartOffset = (String) readableConfig.get(SCAN_STARTUP_SUB_START_OFFSET);
                    startupOptions.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
                    break;
                default:
                    throw new TableException("Unsupported startup mode. Validator should have checked that.");
            }
        } else {
            startupOptions.startupMode = StartupMode.LATEST;
        }
        return startupOptions;
    }

    private static MessageIdImpl parseMessageId(String str) {
        String[] split = str.split(LocalDateTimeSchema.DELIMITER);
        return new MessageIdImpl(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]));
    }

    public static Map<Integer, String> parseSpecificOffsets(String str, String str2) {
        String str3;
        HashMap hashMap = new HashMap();
        String[] split = str.split(BuilderHelper.TOKEN_SEPARATOR);
        String format = String.format("Invalid properties '%s' should follow the format messageId with partition'42:1012:0;44:1011:1', but is '%s'.", str2, str);
        if (split.length == 0) {
            throw new ValidationException(format);
        }
        int length = split.length;
        for (int i = 0; i < length && null != (str3 = split[i]) && str3.length() != 0; i++) {
            if (str3.contains(",")) {
                throw new ValidationException(format);
            }
            String[] split2 = str3.split(LocalDateTimeSchema.DELIMITER);
            if (split2.length != 3) {
                throw new ValidationException(format);
            }
            try {
                hashMap.put(Integer.valueOf(split2[2]), str3);
            } catch (NumberFormatException e) {
                throw new ValidationException(format, e);
            }
        }
        return hashMap;
    }

    public static int[] createKeyFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.ROW), "Row data type expected.");
        Optional optional = readableConfig.getOptional(KEY_FORMAT);
        Optional optional2 = readableConfig.getOptional(KEY_FIELDS);
        if (!optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException(String.format("The option '%s' can only be declared if a key format is defined using '%s'.", KEY_FIELDS.key(), KEY_FORMAT.key()));
        }
        if (optional.isPresent() && (!optional2.isPresent() || ((List) optional2.get()).size() == 0)) {
            throw new ValidationException(String.format("A key format '%s' requires the declaration of one or more of key fields using '%s'.", KEY_FORMAT.key(), KEY_FIELDS.key()));
        }
        if (!optional.isPresent()) {
            return new int[0];
        }
        String str = (String) readableConfig.getOptional(KEY_FIELDS_PREFIX).orElse("");
        List list = (List) optional2.get();
        List fieldNames = LogicalTypeChecks.getFieldNames(logicalType);
        return list.stream().mapToInt(str2 -> {
            int indexOf = fieldNames.indexOf(str2);
            if (indexOf < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option:\n%s", str2, KEY_FIELDS.key(), fieldNames));
            }
            if (str2.startsWith(str)) {
                return indexOf;
            }
            throw new ValidationException(String.format("All fields in '%s' must be prefixed with '%s' when option '%s' is set but field '%s' is not prefixed.", KEY_FIELDS.key(), str, KEY_FIELDS_PREFIX.key(), str2));
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.ROW), "Row data type expected.");
        IntStream range = IntStream.range(0, LogicalTypeChecks.getFieldCount(logicalType));
        String str = (String) readableConfig.getOptional(KEY_FIELDS_PREFIX).orElse("");
        ValueFieldsStrategy valueFieldsStrategy = (ValueFieldsStrategy) readableConfig.get(VALUE_FIELDS_INCLUDE);
        if (valueFieldsStrategy == ValueFieldsStrategy.ALL) {
            if (str.length() > 0) {
                throw new ValidationException(String.format("A key prefix is not allowed when option '%s' is set to '%s'. Set it to '%s' instead to avoid field overlaps.", VALUE_FIELDS_INCLUDE.key(), ValueFieldsStrategy.ALL, ValueFieldsStrategy.EXCEPT_KEY));
            }
            return range.toArray();
        }
        if (valueFieldsStrategy != ValueFieldsStrategy.EXCEPT_KEY) {
            throw new TableException("Unknown value fields strategy:" + valueFieldsStrategy);
        }
        int[] createKeyFormatProjection = createKeyFormatProjection(readableConfig, dataType);
        return range.filter(i -> {
            return IntStream.of(createKeyFormatProjection).noneMatch(i -> {
                return i == i;
            });
        }).toArray();
    }

    private PulsarTableOptions() {
    }
}
