/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import gnu.trove.map.hash.TIntLongHashMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.base.Pair;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.kafka.ImmutableAppend;
import io.deephaven.kafka.ImmutableBlink;
import io.deephaven.kafka.ImmutableRing;
import io.deephaven.kafka.KafkaSchemaUtils;
import io.deephaven.kafka.StreamPublisherImpl;
import io.deephaven.kafka.ingest.ConsumerRecordToStreamPublisherAdapter;
import io.deephaven.kafka.ingest.GenericRecordChunkAdapter;
import io.deephaven.kafka.ingest.JsonNodeChunkAdapter;
import io.deephaven.kafka.ingest.JsonNodeUtil;
import io.deephaven.kafka.ingest.KafkaIngester;
import io.deephaven.kafka.ingest.KafkaStreamConsumer;
import io.deephaven.kafka.ingest.KafkaStreamPublisher;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.publish.GenericRecordKeyOrValueSerializer;
import io.deephaven.kafka.publish.JsonKeyOrValueSerializer;
import io.deephaven.kafka.publish.KafkaPublisherException;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.kafka.publish.PublishToKafka;
import io.deephaven.kafka.publish.SimpleKeyOrValueSerializer;
import io.deephaven.stream.StreamPublisher;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.time.DateTime;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.vector.ByteVector;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.immutables.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class KafkaTools {
    public static final String KAFKA_PARTITION_COLUMN_NAME_PROPERTY = "deephaven.partition.column.name";
    public static final String KAFKA_PARTITION_COLUMN_NAME_DEFAULT = "KafkaPartition";
    public static final String OFFSET_COLUMN_NAME_PROPERTY = "deephaven.offset.column.name";
    public static final String OFFSET_COLUMN_NAME_DEFAULT = "KafkaOffset";
    public static final String TIMESTAMP_COLUMN_NAME_PROPERTY = "deephaven.timestamp.column.name";
    public static final String TIMESTAMP_COLUMN_NAME_DEFAULT = "KafkaTimestamp";
    public static final String KEY_COLUMN_NAME_PROPERTY = "deephaven.key.column.name";
    public static final String KEY_COLUMN_NAME_DEFAULT = "KafkaKey";
    public static final String VALUE_COLUMN_NAME_PROPERTY = "deephaven.value.column.name";
    public static final String VALUE_COLUMN_NAME_DEFAULT = "KafkaValue";
    public static final String KEY_COLUMN_TYPE_PROPERTY = "deephaven.key.column.type";
    public static final String VALUE_COLUMN_TYPE_PROPERTY = "deephaven.value.column.type";
    public static final String SCHEMA_SERVER_PROPERTY = "schema.registry.url";
    public static final String SHORT_DESERIALIZER = ShortDeserializer.class.getName();
    public static final String INT_DESERIALIZER = IntegerDeserializer.class.getName();
    public static final String LONG_DESERIALIZER = LongDeserializer.class.getName();
    public static final String FLOAT_DESERIALIZER = FloatDeserializer.class.getName();
    public static final String DOUBLE_DESERIALIZER = DoubleDeserializer.class.getName();
    public static final String BYTE_ARRAY_DESERIALIZER = ByteArrayDeserializer.class.getName();
    public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
    public static final String BYTE_BUFFER_DESERIALIZER = ByteBufferDeserializer.class.getName();
    public static final String AVRO_DESERIALIZER = KafkaAvroDeserializer.class.getName();
    public static final String DESERIALIZER_FOR_IGNORE = BYTE_BUFFER_DESERIALIZER;
    public static final String SHORT_SERIALIZER = ShortSerializer.class.getName();
    public static final String INT_SERIALIZER = IntegerSerializer.class.getName();
    public static final String LONG_SERIALIZER = LongSerializer.class.getName();
    public static final String FLOAT_SERIALIZER = FloatSerializer.class.getName();
    public static final String DOUBLE_SERIALIZER = DoubleSerializer.class.getName();
    public static final String BYTE_ARRAY_SERIALIZER = ByteArraySerializer.class.getName();
    public static final String STRING_SERIALIZER = StringSerializer.class.getName();
    public static final String BYTE_BUFFER_SERIALIZER = ByteBufferSerializer.class.getName();
    public static final String AVRO_SERIALIZER = KafkaAvroSerializer.class.getName();
    public static final String SERIALIZER_FOR_IGNORE = BYTE_BUFFER_SERIALIZER;
    public static final String NESTED_FIELD_NAME_SEPARATOR = ".";
    private static final Pattern NESTED_FIELD_NAME_SEPARATOR_PATTERN = Pattern.compile(Pattern.quote("."));
    public static final String NESTED_FIELD_COLUMN_NAME_SEPARATOR = "__";
    public static final String AVRO_LATEST_VERSION = "latest";
    private static final Logger log = LoggerFactory.getLogger(KafkaTools.class);
    private static final int CHUNK_SIZE = 2048;
    private static final Function<Object, Object> jsonToObjectChunkMapper = in -> {
        String json;
        try {
            json = (String)in;
        }
        catch (ClassCastException ex) {
            throw new UncheckedDeephavenException("Could not convert input to json string", (Throwable)ex);
        }
        return JsonNodeUtil.makeJsonNode(json);
    };
    public static final long SEEK_TO_BEGINNING = KafkaIngester.SEEK_TO_BEGINNING;
    public static final long DONT_SEEK = KafkaIngester.DONT_SEEK;
    public static final long SEEK_TO_END = KafkaIngester.SEEK_TO_END;
    public static final IntPredicate ALL_PARTITIONS = KafkaIngester.ALL_PARTITIONS;
    public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING = KafkaIngester.ALL_PARTITIONS_SEEK_TO_BEGINNING;
    public static final IntToLongFunction ALL_PARTITIONS_DONT_SEEK = KafkaIngester.ALL_PARTITIONS_DONT_SEEK;
    public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_END = KafkaIngester.ALL_PARTITIONS_SEEK_TO_END;
    public static final Function<String, String> DIRECT_MAPPING = fieldName -> fieldName.replace(NESTED_FIELD_NAME_SEPARATOR, NESTED_FIELD_COLUMN_NAME_SEPARATOR);
    public static final Consume.KeyOrValueSpec FROM_PROPERTIES = Consume.KeyOrValueSpec.FROM_PROPERTIES;

    public static Schema getAvroSchema(String avroSchemaAsJsonString) {
        return new Schema.Parser().parse(avroSchemaAsJsonString);
    }

    public static Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, MutableObject<Properties> colPropsOut) {
        SchemaBuilder.FieldAssembler<Schema> fass = ((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)schemaName).namespace(namespace)).fields();
        List colDefs = t.getDefinition().getColumns();
        colPropsOut.setValue((Object)colProps);
        for (ColumnDefinition colDef : colDefs) {
            if (includeOnly != null && !includeOnly.test(colDef.getName()) || exclude != null && exclude.test(colDef.getName())) continue;
            fass = KafkaTools.addFieldForColDef(t, fass, colDef, colPropsOut);
        }
        return (Schema)fass.endRecord();
    }

    private static void validatePrecisionAndScaleForRefreshingTable(BigDecimalUtils.PropertyNames names, BigDecimalUtils.PrecisionAndScale values) {
        String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + " in a refreshing table implies both properties '" + names.precisionProperty + "' and '" + names.scaleProperty + "' should be defined; ";
        if (values.precision == -1 && values.scale == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing both");
        }
        if (values.precision == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing '" + names.precisionProperty + "'");
        }
        if (values.scale == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing '" + names.scaleProperty + "'");
        }
    }

    private static BigDecimalUtils.PrecisionAndScale ensurePrecisionAndScaleForStaticTable(MutableObject<Properties> colPropsMu, Table t, BigDecimalUtils.PropertyNames names, BigDecimalUtils.PrecisionAndScale valuesIn) {
        Properties toSet;
        if (valuesIn.precision != -1 && valuesIn.scale != -1) {
            return valuesIn;
        }
        String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + " in a non refreshing table implies either both properties '" + names.precisionProperty + "' and '" + names.scaleProperty + "' should be defined, or none of them;";
        if (valuesIn.precision != -1) {
            throw new IllegalArgumentException(exBaseMsg + " only '" + names.precisionProperty + "' is defined, missing '" + names.scaleProperty + "'");
        }
        if (valuesIn.scale != -1) {
            throw new IllegalArgumentException(exBaseMsg + " only '" + names.scaleProperty + "' is defined, missing '" + names.precisionProperty + "'");
        }
        BigDecimalUtils.PrecisionAndScale newValues = BigDecimalUtils.computePrecisionAndScale((Table)t, (String)names.columnName);
        Properties colProps = (Properties)colPropsMu.getValue();
        if (colProps == null) {
            toSet = new Properties();
            colPropsMu.setValue((Object)toSet);
        } else {
            toSet = colProps;
        }
        BigDecimalUtils.setProperties((Properties)toSet, (BigDecimalUtils.PropertyNames)names, (BigDecimalUtils.PrecisionAndScale)newValues);
        return newValues;
    }

    private static SchemaBuilder.FieldAssembler<Schema> addFieldForColDef(Table t, SchemaBuilder.FieldAssembler<Schema> fassIn, ColumnDefinition<?> colDef, MutableObject<Properties> colPropsMu) {
        String logicalTypeName = "logicalType";
        String dhTypeAttribute = "dhType";
        SchemaBuilder.FieldAssembler fass = fassIn;
        Class type = colDef.getDataType();
        String colName = colDef.getName();
        SchemaBuilder.BaseFieldTypeBuilder base = fass.name(colName).type().nullable();
        if (type == Byte.TYPE || type == Character.TYPE || type == Short.TYPE) {
            fass = ((SchemaBuilder.IntDefault)((SchemaBuilder.IntBuilder)base.intBuilder().prop("dhType", type.getName())).endInt()).noDefault();
        } else if (type == Integer.TYPE) {
            fass = base.intType().noDefault();
        } else if (type == Long.TYPE) {
            fass = base.longType().noDefault();
        } else if (type == Float.TYPE) {
            fass = base.floatType().noDefault();
        } else if (type == Double.TYPE) {
            fass = base.doubleType().noDefault();
        } else if (type == String.class) {
            fass = base.stringType().noDefault();
        } else if (type == DateTime.class) {
            fass = ((SchemaBuilder.LongDefault)((SchemaBuilder.LongBuilder)base.longBuilder().prop("logicalType", "timestamp-micros")).endLong()).noDefault();
        } else if (type == BigDecimal.class) {
            BigDecimalUtils.PropertyNames propertyNames = new BigDecimalUtils.PropertyNames(colName);
            BigDecimalUtils.PrecisionAndScale values = BigDecimalUtils.getPrecisionAndScaleFromColumnProperties((BigDecimalUtils.PropertyNames)propertyNames, (Properties)((Properties)colPropsMu.getValue()), (boolean)true);
            if (t.isRefreshing()) {
                KafkaTools.validatePrecisionAndScaleForRefreshingTable(propertyNames, values);
            } else {
                KafkaTools.ensurePrecisionAndScaleForStaticTable(colPropsMu, t, propertyNames, values);
            }
            fass = ((SchemaBuilder.BytesDefault)((SchemaBuilder.BytesBuilder)((SchemaBuilder.BytesBuilder)((SchemaBuilder.BytesBuilder)base.bytesBuilder().prop("logicalType", "decimal")).prop("precision", (Object)values.precision)).prop("scale", (Object)values.scale)).endBytes()).noDefault();
        } else {
            fass = ((SchemaBuilder.BytesDefault)((SchemaBuilder.BytesBuilder)base.bytesBuilder().prop("dhType", type.getName())).endBytes()).noDefault();
        }
        return fass;
    }

    private static void pushColumnTypesFromAvroField(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, String fieldNamePrefix, Schema.Field field, Function<String, String> fieldPathToColumnName) {
        Schema fieldSchema = field.schema();
        String fieldName = field.name();
        String mappedNameForColumn = fieldPathToColumnName.apply(fieldNamePrefix + fieldName);
        if (mappedNameForColumn == null) {
            return;
        }
        Schema.Type fieldType = fieldSchema.getType();
        KafkaTools.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix, fieldName, fieldSchema, mappedNameForColumn, fieldType, fieldPathToColumnName);
    }

    private static LogicalType getEffectiveLogicalType(String fieldName, Schema fieldSchema) {
        Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema);
        return effectiveSchema.getLogicalType();
    }

    private static void pushColumnTypesFromAvroField(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, String fieldNamePrefix, String fieldName, Schema fieldSchema, String mappedNameForColumn, Schema.Type fieldType, Function<String, String> fieldPathToColumnName) {
        block0 : switch (fieldType) {
            case BOOLEAN: {
                columnsOut.add(ColumnDefinition.ofBoolean((String)mappedNameForColumn));
                break;
            }
            case INT: {
                columnsOut.add(ColumnDefinition.ofInt((String)mappedNameForColumn));
                break;
            }
            case LONG: {
                LogicalType logicalType = KafkaTools.getEffectiveLogicalType(fieldName, fieldSchema);
                if (LogicalTypes.timestampMicros().equals(logicalType) || LogicalTypes.timestampMillis().equals(logicalType)) {
                    columnsOut.add(ColumnDefinition.ofTime((String)mappedNameForColumn));
                    break;
                }
                columnsOut.add(ColumnDefinition.ofLong((String)mappedNameForColumn));
                break;
            }
            case FLOAT: {
                columnsOut.add(ColumnDefinition.ofFloat((String)mappedNameForColumn));
                break;
            }
            case DOUBLE: {
                columnsOut.add(ColumnDefinition.ofDouble((String)mappedNameForColumn));
                break;
            }
            case ENUM: 
            case STRING: {
                columnsOut.add(ColumnDefinition.ofString((String)mappedNameForColumn));
                break;
            }
            case UNION: {
                Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema);
                if (effectiveSchema == fieldSchema) {
                    columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, GenericRecord.class));
                    break;
                }
                KafkaTools.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix, fieldName, effectiveSchema, mappedNameForColumn, effectiveSchema.getType(), fieldPathToColumnName);
                return;
            }
            case RECORD: {
                for (Schema.Field nestedField : fieldSchema.getFields()) {
                    KafkaTools.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix + fieldName + NESTED_FIELD_NAME_SEPARATOR, nestedField, fieldPathToColumnName);
                }
                return;
            }
            case BYTES: 
            case FIXED: {
                LogicalType logicalType = KafkaTools.getEffectiveLogicalType(fieldName, fieldSchema);
                if (logicalType instanceof LogicalTypes.Decimal) {
                    columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, BigDecimal.class));
                    break;
                }
                columnsOut.add(ColumnDefinition.ofVector((String)mappedNameForColumn, ByteVector.class));
                break;
            }
            case ARRAY: {
                Schema elementTypeSchema = fieldSchema.getElementType();
                Schema.Type elementTypeType = elementTypeSchema.getType();
                if (elementTypeType.equals((Object)Schema.Type.UNION)) {
                    elementTypeSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, elementTypeSchema);
                    elementTypeType = elementTypeSchema.getType();
                }
                switch (elementTypeType) {
                    case INT: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, int[].class));
                        break block0;
                    }
                    case LONG: {
                        LogicalType logicalType = KafkaTools.getEffectiveLogicalType(fieldName, elementTypeSchema);
                        if (LogicalTypes.timestampMicros().equals(logicalType) || LogicalTypes.timestampMillis().equals(logicalType)) {
                            columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, DateTime[].class));
                            break block0;
                        }
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, long[].class));
                        break block0;
                    }
                    case FLOAT: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, float[].class));
                        break block0;
                    }
                    case DOUBLE: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, double[].class));
                        break block0;
                    }
                    case BOOLEAN: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Boolean[].class));
                        break block0;
                    }
                    case ENUM: 
                    case STRING: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, String[].class));
                        break block0;
                    }
                }
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Object[].class));
                break;
            }
            case MAP: {
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Map.class));
                break;
            }
            default: {
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, GenericContainer.class));
            }
        }
        if (fieldPathToColumnNameOut != null) {
            fieldPathToColumnNameOut.put(fieldNamePrefix + fieldName, mappedNameForColumn);
        }
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, Schema schema, Function<String, String> requestedFieldPathToColumnName) {
        if (schema.isUnion()) {
            throw new UnsupportedOperationException("Schemas defined as a union of records are not supported");
        }
        Schema.Type type = schema.getType();
        if (type != Schema.Type.RECORD) {
            throw new IllegalArgumentException("The schema is not a toplevel record definition.");
        }
        List fields = schema.getFields();
        for (Schema.Field field : fields) {
            KafkaTools.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, "", field, requestedFieldPathToColumnName);
        }
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Schema schema, Function<String, String> requestedFieldPathToColumnName) {
        KafkaTools.avroSchemaToColumnDefinitions(columnsOut, null, schema, requestedFieldPathToColumnName);
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Schema schema) {
        KafkaTools.avroSchemaToColumnDefinitions(columnsOut, schema, DIRECT_MAPPING);
    }

    @ScriptApi
    public static TableType friendlyNameToTableType(@NotNull String typeName) {
        String[] split = typeName.split(":");
        switch (split[0].trim()) {
            case "blink": 
            case "stream": {
                if (split.length != 1) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                return TableType.blink();
            }
            case "append": {
                if (split.length != 1) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                return TableType.append();
            }
            case "ring": {
                if (split.length != 2) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                try {
                    return TableType.ring(Integer.parseInt(split[1].trim()));
                }
                catch (NumberFormatException e) {
                    throw KafkaTools.unexpectedType(typeName, e);
                }
            }
        }
        throw KafkaTools.unexpectedType(typeName, null);
    }

    private static IllegalArgumentException unexpectedType(@NotNull String typeName, @Nullable Exception cause) {
        return new IllegalArgumentException("Unexpected type format \"" + typeName + "\", expected \"blink\", \"append\", or \"ring:<capacity>\"", cause);
    }

    public static Table consumeToTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull TableType tableType) {
        return KafkaTools.consumeToResult(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, tableType, new TableResultFactory());
    }

    public static PartitionedTable consumeToPartitionedTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull TableType tableType) {
        return KafkaTools.consumeToResult(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, tableType, new PartitionedTableResultFactory());
    }

    public static <RESULT_TYPE> RESULT_TYPE consumeToResult(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull TableType tableType, @NotNull ResultFactory<RESULT_TYPE> resultFactory) {
        boolean ignoreValue;
        boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE;
        boolean bl = ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE;
        if (ignoreKey && ignoreValue) {
            throw new IllegalArgumentException("can't ignore both key and value: keySpec and valueSpec can't both be ignore specs");
        }
        if (ignoreKey) {
            KafkaTools.setDeserIfNotSet(kafkaProperties, KeyOrValue.KEY, DESERIALIZER_FOR_IGNORE);
        }
        if (ignoreValue) {
            KafkaTools.setDeserIfNotSet(kafkaProperties, KeyOrValue.VALUE, DESERIALIZER_FOR_IGNORE);
        }
        ColumnDefinition[] commonColumns = new ColumnDefinition[3];
        KafkaTools.getCommonCols(commonColumns, 0, kafkaProperties);
        ArrayList columnDefinitions = new ArrayList();
        int[] commonColumnIndices = new int[3];
        int nextColumnIndex = 0;
        for (int i = 0; i < 3; ++i) {
            if (commonColumns[i] != null) {
                commonColumnIndices[i] = nextColumnIndex++;
                columnDefinitions.add(commonColumns[i]);
                continue;
            }
            commonColumnIndices[i] = -1;
        }
        MutableInt nextColumnIndexMut = new MutableInt(nextColumnIndex);
        KeyOrValueIngestData keyIngestData = KafkaTools.getIngestData(KeyOrValue.KEY, kafkaProperties, columnDefinitions, nextColumnIndexMut, keySpec);
        KeyOrValueIngestData valueIngestData = KafkaTools.getIngestData(KeyOrValue.VALUE, kafkaProperties, columnDefinitions, nextColumnIndexMut, valueSpec);
        TableDefinition tableDefinition = TableDefinition.of(columnDefinitions);
        UpdateSourceRegistrar updateSourceRegistrar = resultFactory.getSourceRegistrar();
        Supplier<Pair<StreamToBlinkTableAdapter, ConsumerRecordToStreamPublisherAdapter>> adapterFactory = () -> {
            StreamPublisherImpl streamPublisher = new StreamPublisherImpl();
            StreamToBlinkTableAdapter streamToBlinkTableAdapter = new StreamToBlinkTableAdapter(tableDefinition, (StreamPublisher)streamPublisher, updateSourceRegistrar, "Kafka-" + topic + "-" + partitionFilter);
            streamPublisher.setChunkFactory(() -> streamToBlinkTableAdapter.makeChunksForDefinition(2048), arg_0 -> ((StreamToBlinkTableAdapter)streamToBlinkTableAdapter).chunkTypeForIndex(arg_0));
            KeyOrValueProcessor keyProcessor = KafkaTools.getProcessor(keySpec, tableDefinition, streamToBlinkTableAdapter, keyIngestData);
            KeyOrValueProcessor valueProcessor = KafkaTools.getProcessor(valueSpec, tableDefinition, streamToBlinkTableAdapter, valueIngestData);
            return new Pair((Object)streamToBlinkTableAdapter, (Object)KafkaStreamPublisher.make(streamPublisher, commonColumnIndices[0], commonColumnIndices[1], commonColumnIndices[2], keyProcessor, valueProcessor, keyIngestData == null ? -1 : keyIngestData.simpleColumnIndex, valueIngestData == null ? -1 : valueIngestData.simpleColumnIndex, keyIngestData == null ? Function.identity() : keyIngestData.toObjectChunkMapper, valueIngestData == null ? Function.identity() : valueIngestData.toObjectChunkMapper));
        };
        MutableObject kafkaIngesterHolder = new MutableObject();
        Pair<RESULT_TYPE, IntFunction<KafkaStreamConsumer>> resultAndConsumerFactoryPair = resultFactory.makeResultAndConsumerFactoryPair(tableDefinition, tableType, adapterFactory, (MutableObject<KafkaIngester>)kafkaIngesterHolder);
        Object result = resultAndConsumerFactoryPair.getFirst();
        IntFunction partitionToConsumer = (IntFunction)resultAndConsumerFactoryPair.getSecond();
        KafkaIngester ingester = new KafkaIngester(log, kafkaProperties, topic, partitionFilter, partitionToConsumer, partitionToInitialOffset);
        kafkaIngesterHolder.setValue((Object)ingester);
        ingester.start();
        return (RESULT_TYPE)result;
    }

    private static KeyOrValueSerializer<?> getAvroSerializer(@NotNull Table t, @NotNull Produce.KeyOrValueSpec.Avro avroSpec, @NotNull String[] columnNames) {
        return new GenericRecordKeyOrValueSerializer(t, avroSpec.schema, columnNames, avroSpec.timestampFieldName, (Properties)avroSpec.columnProperties.getValue());
    }

    private static KeyOrValueSerializer<?> getJsonSerializer(@NotNull Table t, @NotNull Produce.KeyOrValueSpec.Json jsonSpec, @NotNull String[] columnNames) {
        String[] fieldNames = jsonSpec.getFieldNames(columnNames);
        return new JsonKeyOrValueSerializer(t, columnNames, fieldNames, jsonSpec.timestampFieldName, jsonSpec.nestedObjectDelimiter, jsonSpec.outputNulls);
    }

    private static KeyOrValueSerializer<?> getSerializer(@NotNull Table t, @NotNull Produce.KeyOrValueSpec spec, @NotNull String[] columnNames) {
        switch (spec.dataFormat()) {
            case AVRO: {
                Produce.KeyOrValueSpec.Avro avroSpec = (Produce.KeyOrValueSpec.Avro)spec;
                return KafkaTools.getAvroSerializer(t, avroSpec, columnNames);
            }
            case JSON: {
                Produce.KeyOrValueSpec.Json jsonSpec = (Produce.KeyOrValueSpec.Json)spec;
                return KafkaTools.getJsonSerializer(t, jsonSpec, columnNames);
            }
            case IGNORE: {
                return null;
            }
            case SIMPLE: {
                Produce.KeyOrValueSpec.Simple simpleSpec = (Produce.KeyOrValueSpec.Simple)spec;
                return new SimpleKeyOrValueSerializer(t, simpleSpec.columnName);
            }
        }
        throw new IllegalStateException("Unrecognized spec type");
    }

    private static String[] getColumnNames(@NotNull Properties kafkaProperties, @NotNull Table t, @NotNull Produce.KeyOrValueSpec spec) {
        switch (spec.dataFormat()) {
            case AVRO: {
                Produce.KeyOrValueSpec.Avro avroSpec = (Produce.KeyOrValueSpec.Avro)spec;
                return avroSpec.getColumnNames(t, kafkaProperties);
            }
            case JSON: {
                Produce.KeyOrValueSpec.Json jsonSpec = (Produce.KeyOrValueSpec.Json)spec;
                return jsonSpec.getColumnNames(t);
            }
            case IGNORE: {
                return null;
            }
            case SIMPLE: {
                Produce.KeyOrValueSpec.Simple simpleSpec = (Produce.KeyOrValueSpec.Simple)spec;
                return new String[]{simpleSpec.columnName};
            }
        }
        throw new IllegalStateException("Unrecognized spec type");
    }

    public static Runnable produceFromTable(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, @NotNull Produce.KeyOrValueSpec keySpec, @NotNull Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) {
        boolean ignoreValue;
        if (table.isRefreshing() && !UpdateGraphProcessor.DEFAULT.exclusiveLock().isHeldByCurrentThread() && !UpdateGraphProcessor.DEFAULT.sharedLock().isHeldByCurrentThread()) {
            throw new KafkaPublisherException("Calling thread must hold an exclusive or shared UpdateGraphProcessor lock to publish live sources");
        }
        boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE;
        boolean bl = ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE;
        if (ignoreKey && ignoreValue) {
            throw new IllegalArgumentException("can't ignore both key and value: keySpec and valueSpec can't both be ignore specs");
        }
        KafkaTools.setSerIfNotSet(kafkaProperties, KeyOrValue.KEY, keySpec, table);
        KafkaTools.setSerIfNotSet(kafkaProperties, KeyOrValue.VALUE, valueSpec, table);
        String[] keyColumns = KafkaTools.getColumnNames(kafkaProperties, table, keySpec);
        String[] valueColumns = KafkaTools.getColumnNames(kafkaProperties, table, valueSpec);
        LivenessScope publisherScope = new LivenessScope(true);
        try (SafeCloseable ignored = LivenessScopeStack.open((LivenessScope)publisherScope, (boolean)false);){
            Table effectiveTable = !ignoreKey && lastByKeyColumns ? (Table)table.lastBy(keyColumns) : table.coalesce();
            KeyOrValueSerializer<?> keySerializer = KafkaTools.getSerializer(effectiveTable, keySpec, keyColumns);
            KeyOrValueSerializer<?> valueSerializer = KafkaTools.getSerializer(effectiveTable, valueSpec, valueColumns);
            PublishToKafka publishToKafka = new PublishToKafka(kafkaProperties, effectiveTable, topic, keyColumns, keySerializer, valueColumns, valueSerializer);
        }
        return () -> ((LivenessScope)publisherScope).release();
    }

    private static void setSerIfNotSet(@NotNull Properties prop, @NotNull KeyOrValue keyOrValue, @NotNull Produce.KeyOrValueSpec spec, @NotNull Table table) {
        String value;
        String propKey;
        String string = propKey = keyOrValue == KeyOrValue.KEY ? "key.serializer" : "value.serializer";
        if (prop.containsKey(propKey)) {
            return;
        }
        switch (spec.dataFormat()) {
            case IGNORE: {
                value = SERIALIZER_FOR_IGNORE;
                break;
            }
            case SIMPLE: {
                value = KafkaTools.getSerializerNameForSimpleSpec(keyOrValue, (Produce.KeyOrValueSpec.Simple)spec, table);
                break;
            }
            case JSON: {
                value = STRING_SERIALIZER;
                break;
            }
            case AVRO: {
                value = AVRO_SERIALIZER;
                break;
            }
            default: {
                throw new IllegalStateException("Unknown dataFormat=" + spec.dataFormat());
            }
        }
        prop.setProperty(propKey, value);
    }

    private static String getSerializerNameForSimpleSpec(@NotNull KeyOrValue keyOrValue, @NotNull Produce.KeyOrValueSpec.Simple simpleSpec, @NotNull Table table) {
        Class dataType = table.getDefinition().getColumn(simpleSpec.columnName).getDataType();
        if (dataType == Short.TYPE) {
            return SHORT_SERIALIZER;
        }
        if (dataType == Integer.TYPE) {
            return INT_SERIALIZER;
        }
        if (dataType == Long.TYPE) {
            return LONG_SERIALIZER;
        }
        if (dataType == Float.TYPE) {
            return FLOAT_SERIALIZER;
        }
        if (dataType == Double.TYPE) {
            return DOUBLE_SERIALIZER;
        }
        if (dataType == String.class) {
            return STRING_SERIALIZER;
        }
        throw new UncheckedDeephavenException("Serializer for " + keyOrValue + " not set in kafka consumer properties and can't automatically set it for type " + dataType);
    }

    private static KeyOrValueProcessor getProcessor(Consume.KeyOrValueSpec spec, TableDefinition tableDef, StreamToBlinkTableAdapter streamToBlinkTableAdapter, KeyOrValueIngestData data) {
        switch (spec.dataFormat()) {
            case IGNORE: 
            case SIMPLE: {
                return null;
            }
            case AVRO: {
                return GenericRecordChunkAdapter.make(tableDef, arg_0 -> ((StreamToBlinkTableAdapter)streamToBlinkTableAdapter).chunkTypeForIndex(arg_0), data.fieldPathToColumnName, NESTED_FIELD_NAME_SEPARATOR_PATTERN, (Schema)data.extra, true);
            }
            case JSON: {
                return JsonNodeChunkAdapter.make(tableDef, arg_0 -> ((StreamToBlinkTableAdapter)streamToBlinkTableAdapter).chunkTypeForIndex(arg_0), data.fieldPathToColumnName, true);
            }
        }
        throw new IllegalStateException("Unknown KeyOrvalueSpec value" + spec.dataFormat());
    }

    private static void setIfNotSet(Properties prop, String key, String value) {
        if (prop.containsKey(key)) {
            return;
        }
        prop.setProperty(key, value);
    }

    private static void setDeserIfNotSet(Properties prop, KeyOrValue keyOrValue, String value) {
        String propKey = keyOrValue == KeyOrValue.KEY ? "key.deserializer" : "value.deserializer";
        KafkaTools.setIfNotSet(prop, propKey, value);
    }

    static Schema getAvroSchema(Properties kafkaProperties, String schemaName, String schemaVersion) {
        try {
            SchemaRegistryClient registryClient = KafkaTools.createSchemaRegistryClient(kafkaProperties);
            SchemaMetadata schemaMetadata = AVRO_LATEST_VERSION.equals(schemaVersion) ? registryClient.getLatestSchemaMetadata(schemaName) : registryClient.getSchemaMetadata(schemaName, Integer.parseInt(schemaVersion));
            return (Schema)registryClient.getSchemaById(schemaMetadata.getId()).rawSchema();
        }
        catch (RestClientException | IOException e) {
            throw new UncheckedDeephavenException(e);
        }
    }

    private static SchemaRegistryClient createSchemaRegistryClient(Properties kafkaProperties) {
        KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(Utils.propsToMap((Properties)kafkaProperties));
        return new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), config.getMaxSchemasPerSubject(), Collections.singletonList(new AvroSchemaProvider()), config.originalsWithPrefix(""), config.requestHeaders());
    }

    private static int putAvroSchema(Properties kafkaProperties, String schemaName, Schema schema) throws RestClientException, IOException {
        SchemaRegistryClient registryClient = KafkaTools.createSchemaRegistryClient(kafkaProperties);
        return registryClient.register(schemaName, (ParsedSchema)new AvroSchema(schema));
    }

    /*
     * WARNING - void declaration
     */
    private static KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, Properties kafkaConsumerProperties, List<ColumnDefinition<?>> columnDefinitions, MutableInt nextColumnIndexMut, Consume.KeyOrValueSpec keyOrValueSpec) {
        if (keyOrValueSpec.dataFormat() == DataFormat.IGNORE) {
            return null;
        }
        KeyOrValueIngestData data = new KeyOrValueIngestData();
        switch (keyOrValueSpec.dataFormat()) {
            case AVRO: {
                KafkaTools.setDeserIfNotSet(kafkaConsumerProperties, keyOrValue, AVRO_DESERIALIZER);
                Consume.KeyOrValueSpec.Avro avroSpec = (Consume.KeyOrValueSpec.Avro)keyOrValueSpec;
                data.fieldPathToColumnName = new HashMap<String, String>();
                Schema schema = avroSpec.schema != null ? avroSpec.schema : KafkaTools.getAvroSchema(kafkaConsumerProperties, avroSpec.schemaName, avroSpec.schemaVersion);
                KafkaTools.avroSchemaToColumnDefinitions(columnDefinitions, data.fieldPathToColumnName, schema, avroSpec.fieldPathToColumnName);
                data.extra = schema;
                break;
            }
            case JSON: {
                KafkaTools.setDeserIfNotSet(kafkaConsumerProperties, keyOrValue, STRING_DESERIALIZER);
                data.toObjectChunkMapper = jsonToObjectChunkMapper;
                Consume.KeyOrValueSpec.Json jsonSpec = (Consume.KeyOrValueSpec.Json)keyOrValueSpec;
                columnDefinitions.addAll(Arrays.asList(jsonSpec.columnDefinitions));
                data.fieldPathToColumnName = new HashMap<String, String>(jsonSpec.columnDefinitions.length);
                HashSet<String> coveredColumns = new HashSet<String>(jsonSpec.columnDefinitions.length);
                if (jsonSpec.fieldToColumnName != null) {
                    for (Map.Entry entry : jsonSpec.fieldToColumnName.entrySet()) {
                        String colName = (String)entry.getValue();
                        data.fieldPathToColumnName.put((String)entry.getKey(), colName);
                        coveredColumns.add(colName);
                    }
                }
                for (ColumnDefinition<?> colDef : jsonSpec.columnDefinitions) {
                    String colName = colDef.getName();
                    if (coveredColumns.contains(colName)) continue;
                    String jsonPtrStr = Consume.KeyOrValueSpec.Json.mapFieldNameToJsonPointerStr(colName);
                    data.fieldPathToColumnName.put(jsonPtrStr, colName);
                }
                break;
            }
            case SIMPLE: {
                void var11_16;
                String propKey;
                data.simpleColumnIndex = nextColumnIndexMut.getAndAdd(1);
                Consume.KeyOrValueSpec.Simple simpleSpec = (Consume.KeyOrValueSpec.Simple)keyOrValueSpec;
                if (simpleSpec.dataType == null) {
                    ColumnDefinition<?> columnDefinition = KafkaTools.getKeyOrValueCol(keyOrValue, kafkaConsumerProperties, simpleSpec.columnName, false);
                } else {
                    ColumnDefinition columnDefinition = ColumnDefinition.fromGenericType((String)simpleSpec.columnName, simpleSpec.dataType);
                }
                String string = propKey = keyOrValue == KeyOrValue.KEY ? "key.deserializer" : "value.deserializer";
                if (!kafkaConsumerProperties.containsKey(propKey)) {
                    Class dataType = var11_16.getDataType();
                    if (dataType == Short.TYPE) {
                        kafkaConsumerProperties.setProperty(propKey, SHORT_DESERIALIZER);
                    } else if (dataType == Integer.TYPE) {
                        kafkaConsumerProperties.setProperty(propKey, INT_DESERIALIZER);
                    } else if (dataType == Long.TYPE) {
                        kafkaConsumerProperties.setProperty(propKey, LONG_DESERIALIZER);
                    } else if (dataType == Float.TYPE) {
                        kafkaConsumerProperties.setProperty(propKey, FLOAT_DESERIALIZER);
                    } else if (dataType == Double.TYPE) {
                        kafkaConsumerProperties.setProperty(propKey, DOUBLE_DESERIALIZER);
                    } else if (dataType == String.class) {
                        kafkaConsumerProperties.setProperty(propKey, STRING_DESERIALIZER);
                    } else {
                        throw new UncheckedDeephavenException("Deserializer for " + keyOrValue + " not set in kafka consumer properties and can't automatically set it for type " + dataType);
                    }
                }
                KafkaTools.setDeserIfNotSet(kafkaConsumerProperties, keyOrValue, STRING_DESERIALIZER);
                columnDefinitions.add((ColumnDefinition<?>)var11_16);
                break;
            }
            default: {
                throw new IllegalStateException("Unhandled spec type:" + keyOrValueSpec.dataFormat());
            }
        }
        return data;
    }

    private static void getCommonCol(@NotNull ColumnDefinition<?>[] columnsToSet, int outOffset, @NotNull Properties consumerProperties, @NotNull String columnNameProperty, @NotNull String columnNameDefault, @NotNull Function<String, ColumnDefinition<?>> builder) {
        if (consumerProperties.containsKey(columnNameProperty)) {
            String partitionColumnName = consumerProperties.getProperty(columnNameProperty);
            columnsToSet[outOffset] = partitionColumnName == null || partitionColumnName.equals("") ? null : builder.apply(partitionColumnName);
            consumerProperties.remove(columnNameProperty);
        } else {
            columnsToSet[outOffset] = builder.apply(columnNameDefault);
        }
    }

    private static int getCommonCols(@NotNull ColumnDefinition<?>[] columnsToSet, int outOffset, @NotNull Properties consumerProperties) {
        int c = outOffset;
        KafkaTools.getCommonCol(columnsToSet, c, consumerProperties, KAFKA_PARTITION_COLUMN_NAME_PROPERTY, KAFKA_PARTITION_COLUMN_NAME_DEFAULT, ColumnDefinition::ofInt);
        int n = ++c;
        KafkaTools.getCommonCol(columnsToSet, n, consumerProperties, OFFSET_COLUMN_NAME_PROPERTY, OFFSET_COLUMN_NAME_DEFAULT, ColumnDefinition::ofLong);
        int n2 = ++c;
        KafkaTools.getCommonCol(columnsToSet, n2, consumerProperties, TIMESTAMP_COLUMN_NAME_PROPERTY, TIMESTAMP_COLUMN_NAME_DEFAULT, colName -> ColumnDefinition.fromGenericType((String)colName, DateTime.class));
        return ++c;
    }

    private static ColumnDefinition<?> getKeyOrValueCol(@NotNull KeyOrValue keyOrValue, @NotNull Properties properties, String columnNameArg, boolean allowEmpty) {
        String columnName;
        String nameDefault;
        String nameProperty;
        String deserializerProperty;
        String typeProperty;
        switch (keyOrValue) {
            case KEY: {
                typeProperty = KEY_COLUMN_TYPE_PROPERTY;
                deserializerProperty = "key.deserializer";
                nameProperty = KEY_COLUMN_NAME_PROPERTY;
                nameDefault = KEY_COLUMN_NAME_DEFAULT;
                break;
            }
            case VALUE: {
                typeProperty = VALUE_COLUMN_TYPE_PROPERTY;
                deserializerProperty = "value.deserializer";
                nameProperty = VALUE_COLUMN_NAME_PROPERTY;
                nameDefault = VALUE_COLUMN_NAME_DEFAULT;
                break;
            }
            default: {
                throw new IllegalStateException("Unrecognized KeyOrValue value " + keyOrValue);
            }
        }
        if (columnNameArg != null) {
            columnName = columnNameArg;
        } else if (properties.containsKey(nameProperty)) {
            columnName = properties.getProperty(nameProperty);
            if (columnName == null || columnName.equals("")) {
                if (allowEmpty) {
                    return null;
                }
                throw new IllegalArgumentException("Property for " + nameDefault + " can't be empty.");
            }
        } else {
            columnName = nameDefault;
        }
        if (properties.containsKey(typeProperty)) {
            String typeAsString;
            switch (typeAsString = properties.getProperty(typeProperty)) {
                case "short": {
                    properties.setProperty(deserializerProperty, SHORT_DESERIALIZER);
                    return ColumnDefinition.ofShort((String)columnName);
                }
                case "int": {
                    properties.setProperty(deserializerProperty, INT_DESERIALIZER);
                    return ColumnDefinition.ofInt((String)columnName);
                }
                case "long": {
                    properties.setProperty(deserializerProperty, LONG_DESERIALIZER);
                    return ColumnDefinition.ofLong((String)columnName);
                }
                case "float": {
                    properties.setProperty(deserializerProperty, FLOAT_DESERIALIZER);
                    return ColumnDefinition.ofDouble((String)columnName);
                }
                case "double": {
                    properties.setProperty(deserializerProperty, DOUBLE_DESERIALIZER);
                    return ColumnDefinition.ofDouble((String)columnName);
                }
                case "byte[]": {
                    properties.setProperty(deserializerProperty, BYTE_ARRAY_DESERIALIZER);
                    return ColumnDefinition.fromGenericType((String)columnName, byte[].class, Byte.TYPE);
                }
                case "String": 
                case "string": {
                    properties.setProperty(deserializerProperty, STRING_DESERIALIZER);
                    return ColumnDefinition.ofString((String)columnName);
                }
            }
            throw new IllegalArgumentException("Property " + typeProperty + " value " + typeAsString + " not supported");
        }
        if (!properties.containsKey(deserializerProperty)) {
            properties.setProperty(deserializerProperty, STRING_DESERIALIZER);
            return ColumnDefinition.ofString((String)columnName);
        }
        return KafkaTools.columnDefinitionFromDeserializer(properties, deserializerProperty, columnName);
    }

    @NotNull
    private static ColumnDefinition<? extends Serializable> columnDefinitionFromDeserializer(@NotNull Properties properties, @NotNull String deserializerProperty, String columnName) {
        String deserializer = properties.getProperty(deserializerProperty);
        if (INT_DESERIALIZER.equals(deserializer)) {
            return ColumnDefinition.ofInt((String)columnName);
        }
        if (LONG_DESERIALIZER.equals(deserializer)) {
            return ColumnDefinition.ofLong((String)columnName);
        }
        if (DOUBLE_DESERIALIZER.equals(deserializer)) {
            return ColumnDefinition.ofDouble((String)columnName);
        }
        if (BYTE_ARRAY_DESERIALIZER.equals(deserializer)) {
            return ColumnDefinition.fromGenericType((String)columnName, byte[].class, Byte.TYPE);
        }
        if (STRING_DESERIALIZER.equals(deserializer)) {
            return ColumnDefinition.ofString((String)columnName);
        }
        throw new IllegalArgumentException("Deserializer type " + deserializer + " for " + deserializerProperty + " not supported.");
    }

    public static IntPredicate partitionFilterFromArray(int[] partitions) {
        Arrays.sort(partitions);
        return p -> Arrays.binarySearch(partitions, p) >= 0;
    }

    public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) {
        if (partitions.length != offsets.length) {
            throw new IllegalArgumentException("lengths of array arguments do not match");
        }
        TIntLongHashMap map = new TIntLongHashMap(partitions.length, 0.5f, 0, KafkaIngester.DONT_SEEK);
        for (int i = 0; i < partitions.length; ++i) {
            map.put(partitions[i], offsets[i]);
        }
        return arg_0 -> ((TIntLongHashMap)map).get(arg_0);
    }

    public static Predicate<String> predicateFromSet(Set<String> set) {
        return set == null ? null : set::contains;
    }

    public static Set<String> topics(@NotNull Properties kafkaProperties) {
        Set set;
        block8: {
            Admin admin = Admin.create((Properties)kafkaProperties);
            try {
                ListTopicsResult result = admin.listTopics();
                set = (Set)result.names().get();
                if (admin == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (admin != null) {
                        try {
                            admin.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException("Failed to list Kafka Topics for " + kafkaProperties, e);
                }
            }
            admin.close();
        }
        return set;
    }

    public static String[] listTopics(@NotNull Properties kafkaProperties) {
        Set<String> topics = KafkaTools.topics(kafkaProperties);
        String[] r = new String[topics.size()];
        return topics.toArray(r);
    }

    private static class SimpleKafkaStreamConsumer
    implements KafkaStreamConsumer {
        private final ConsumerRecordToStreamPublisherAdapter adapter;
        private final StreamToBlinkTableAdapter streamToBlinkTableAdapter;

        public SimpleKafkaStreamConsumer(ConsumerRecordToStreamPublisherAdapter adapter, StreamToBlinkTableAdapter streamToBlinkTableAdapter) {
            this.adapter = adapter;
            this.streamToBlinkTableAdapter = streamToBlinkTableAdapter;
        }

        @Override
        public long consume(List<? extends ConsumerRecord<?, ?>> consumerRecords) {
            try {
                return this.adapter.consumeRecords(consumerRecords);
            }
            catch (Exception e) {
                this.acceptFailure(e);
                return 0L;
            }
        }

        public void acceptFailure(@NotNull Throwable cause) {
            this.streamToBlinkTableAdapter.acceptFailure(cause);
        }
    }

    private static class KeyOrValueIngestData {
        public Map<String, String> fieldPathToColumnName;
        public int simpleColumnIndex = -1;
        public Function<Object, Object> toObjectChunkMapper = Function.identity();
        public Object extra;

        private KeyOrValueIngestData() {
        }
    }

    private static final class StreamPartitionedTable
    extends PartitionedTableImpl
    implements Runnable {
        private static final String PARTITION_COLUMN_NAME = "Partition";
        private static final String CONSTITUENT_COLUMN_NAME = "Table";
        @ReferentialIntegrity
        private final UpdateSourceCombiner refreshCombiner;
        private final WritableColumnSource<Integer> partitionColumn;
        private final WritableColumnSource<Table> constituentColumn;
        private volatile long lastAddedPartitionRowKey = -1L;

        private StreamPartitionedTable(@NotNull TableDefinition constituentDefinition, @NotNull UpdateSourceCombiner refreshCombiner) {
            super(StreamPartitionedTable.makeResultTable(), Set.of(PARTITION_COLUMN_NAME), true, CONSTITUENT_COLUMN_NAME, constituentDefinition, true, false);
            this.refreshCombiner = refreshCombiner;
            this.partitionColumn = (WritableColumnSource)this.table().getColumnSource(PARTITION_COLUMN_NAME, Integer.TYPE);
            this.constituentColumn = (WritableColumnSource)this.table().getColumnSource(CONSTITUENT_COLUMN_NAME, Table.class);
            this.manage((LivenessReferent)refreshCombiner);
            refreshCombiner.addSource((Runnable)this);
            UpdateGraphProcessor.DEFAULT.addSource((Runnable)refreshCombiner);
        }

        @Override
        public void run() {
            long newLastRowKey = this.lastAddedPartitionRowKey;
            TrackingWritableRowSet rowSet = this.table().getRowSet().writableCast();
            long oldLastRowKey = rowSet.lastRowKey();
            if (newLastRowKey != oldLastRowKey) {
                WritableRowSet added = RowSetFactory.fromRange((long)(oldLastRowKey + 1L), (long)newLastRowKey);
                rowSet.insert((RowSet)added);
                ((BaseTable)this.table()).notifyListeners((TableUpdate)new TableUpdateImpl((RowSet)added, (RowSet)RowSetFactory.empty(), (RowSet)RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
            }
        }

        public synchronized void enqueueAdd(int partition, @NotNull Table partitionTable) {
            long partitionRowKey = this.lastAddedPartitionRowKey + 1L;
            this.partitionColumn.ensureCapacity(partitionRowKey + 1L);
            this.partitionColumn.set(partitionRowKey, partition);
            this.constituentColumn.ensureCapacity(partitionRowKey + 1L);
            this.constituentColumn.set(partitionRowKey, (Object)partitionTable);
            this.lastAddedPartitionRowKey = partitionRowKey;
        }

        private static Table makeResultTable() {
            LinkedHashMap<String, WritableColumnSource> resultSources = new LinkedHashMap<String, WritableColumnSource>(2);
            resultSources.put(PARTITION_COLUMN_NAME, ArrayBackedColumnSource.getMemoryColumnSource(Integer.TYPE, null));
            resultSources.put(CONSTITUENT_COLUMN_NAME, ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null));
            return new QueryTable((TrackingRowSet)RowSetFactory.empty().toTracking(), resultSources){
                {
                    this.setFlat();
                    this.setRefreshing(true);
                }
            };
        }
    }

    private static class BlinkTableOperation
    implements TableType.Visitor<Table> {
        private final Table blinkTable;

        public BlinkTableOperation(Table blinkTable) {
            this.blinkTable = Objects.requireNonNull(blinkTable);
        }

        @Override
        public Table visit(TableType.Blink blink) {
            return this.blinkTable;
        }

        @Override
        public Table visit(TableType.Append append) {
            return BlinkTableTools.blinkToAppendOnly((Table)this.blinkTable);
        }

        @Override
        public Table visit(TableType.Ring ring) {
            return RingTableTools.of((Table)this.blinkTable, (int)ring.capacity());
        }
    }

    private static class PartitionedTableResultFactory
    implements ResultFactory<PartitionedTable> {
        private final UpdateSourceCombiner refreshCombiner = new UpdateSourceCombiner();

        private PartitionedTableResultFactory() {
        }

        @Override
        public UpdateSourceRegistrar getSourceRegistrar() {
            return this.refreshCombiner;
        }

        @Override
        public Pair<PartitionedTable, IntFunction<KafkaStreamConsumer>> makeResultAndConsumerFactoryPair(@NotNull TableDefinition tableDefinition, @NotNull TableType tableType, @NotNull Supplier<Pair<StreamToBlinkTableAdapter, ConsumerRecordToStreamPublisherAdapter>> adapterFactory, @NotNull MutableObject<KafkaIngester> kafkaIngesterHolder) {
            StreamPartitionedTable result = new StreamPartitionedTable(tableDefinition, this.refreshCombiner);
            IntFunction<KafkaStreamConsumer> consumerFactory = partition -> {
                Pair partitionAdapterPair = (Pair)adapterFactory.get();
                ((StreamToBlinkTableAdapter)partitionAdapterPair.getFirst()).setShutdownCallback(() -> ((KafkaIngester)kafkaIngesterHolder.getValue()).shutdownPartition(partition));
                Table blinkTable = ((StreamToBlinkTableAdapter)partitionAdapterPair.getFirst()).table();
                Table partitionTable = (Table)tableType.walk(new BlinkTableOperation(blinkTable));
                result.enqueueAdd(partition, partitionTable);
                return new SimpleKafkaStreamConsumer((ConsumerRecordToStreamPublisherAdapter)partitionAdapterPair.getSecond(), (StreamToBlinkTableAdapter)partitionAdapterPair.getFirst());
            };
            return new Pair((Object)result, consumerFactory);
        }
    }

    private static class TableResultFactory
    implements ResultFactory<Table> {
        private TableResultFactory() {
        }

        @Override
        public UpdateSourceRegistrar getSourceRegistrar() {
            return UpdateGraphProcessor.DEFAULT;
        }

        @Override
        public Pair<Table, IntFunction<KafkaStreamConsumer>> makeResultAndConsumerFactoryPair(@NotNull TableDefinition tableDefinition, @NotNull TableType tableType, @NotNull Supplier<Pair<StreamToBlinkTableAdapter, ConsumerRecordToStreamPublisherAdapter>> adapterFactory, @NotNull MutableObject<KafkaIngester> kafkaIngesterHolder) {
            Pair<StreamToBlinkTableAdapter, ConsumerRecordToStreamPublisherAdapter> singleAdapterPair = adapterFactory.get();
            Table blinkTable = ((StreamToBlinkTableAdapter)singleAdapterPair.getFirst()).table();
            Table result = (Table)tableType.walk(new BlinkTableOperation(blinkTable));
            IntFunction<KafkaStreamConsumer> consumerFactory = partition -> {
                ((StreamToBlinkTableAdapter)singleAdapterPair.getFirst()).setShutdownCallback(() -> ((KafkaIngester)kafkaIngesterHolder.getValue()).shutdown());
                return new SimpleKafkaStreamConsumer((ConsumerRecordToStreamPublisherAdapter)singleAdapterPair.getSecond(), (StreamToBlinkTableAdapter)singleAdapterPair.getFirst());
            };
            return new Pair((Object)result, consumerFactory);
        }
    }

    private static interface ResultFactory<TYPE> {
        public UpdateSourceRegistrar getSourceRegistrar();

        public Pair<TYPE, IntFunction<KafkaStreamConsumer>> makeResultAndConsumerFactoryPair(@NotNull TableDefinition var1, @NotNull TableType var2, @NotNull Supplier<Pair<StreamToBlinkTableAdapter, ConsumerRecordToStreamPublisherAdapter>> var3, @NotNull MutableObject<KafkaIngester> var4);
    }

    public static interface TableType {
        public static Blink blink() {
            return Blink.of();
        }

        public static Append append() {
            return Append.of();
        }

        public static Ring ring(int capacity) {
            return Ring.of(capacity);
        }

        public <T, V extends Visitor<T>> T walk(V var1);

        @Value.Immutable
        @SimpleStyle
        public static abstract class Ring
        implements TableType {
            public static Ring of(int capacity) {
                return ImmutableRing.of(capacity);
            }

            @Value.Parameter
            public abstract int capacity();

            @Override
            public final <T, V extends Visitor<T>> T walk(V visitor) {
                return visitor.visit(this);
            }
        }

        @Value.Immutable
        @SimpleStyle
        public static abstract class Append
        implements TableType {
            public static Append of() {
                return ImmutableAppend.of();
            }

            @Override
            public final <T, V extends Visitor<T>> T walk(V visitor) {
                return visitor.visit(this);
            }
        }

        @Value.Immutable
        @SimpleStyle
        public static abstract class Blink
        implements TableType {
            public static Blink of() {
                return ImmutableBlink.of();
            }

            @Override
            public final <T, V extends Visitor<T>> T walk(V visitor) {
                return visitor.visit(this);
            }
        }

        public static interface Visitor<T> {
            public T visit(Blink var1);

            public T visit(Append var1);

            public T visit(Ring var1);
        }
    }

    public static class Produce {
        public static final KeyOrValueSpec.Ignore IGNORE = new KeyOrValueSpec.Ignore();

        public static KeyOrValueSpec ignoreSpec() {
            return IGNORE;
        }

        public static KeyOrValueSpec simpleSpec(String columnName) {
            return new KeyOrValueSpec.Simple(columnName);
        }

        public static KeyOrValueSpec jsonSpec(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnToFieldMapping, String nestedObjectDelimiter, boolean outputNulls, String timestampFieldName) {
            if (includeColumns != null && excludeColumns != null) {
                throw new IllegalArgumentException("Both includeColumns (=" + includeColumns + ") and excludeColumns (=" + excludeColumns + ") are not null, at least one of them should be null.");
            }
            return new KeyOrValueSpec.Json(includeColumns, excludeColumns, columnToFieldMapping, nestedObjectDelimiter, outputNulls, timestampFieldName);
        }

        public static KeyOrValueSpec jsonSpec(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnToFieldMapping) {
            return Produce.jsonSpec(includeColumns, excludeColumns, columnToFieldMapping, null, false, null);
        }

        public static KeyOrValueSpec avroSpec(Schema schema, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns) {
            return new KeyOrValueSpec.Avro(schema, null, null, fieldToColumnMapping, timestampFieldName, includeOnlyColumns, excludeColumns, false, null, null);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns, boolean publishSchema, String schemaNamespace, Properties columnProperties) {
            return new KeyOrValueSpec.Avro(null, schemaName, schemaVersion, fieldToColumnMapping, timestampFieldName, includeOnlyColumns, excludeColumns, publishSchema, schemaNamespace, columnProperties);
        }

        static abstract class KeyOrValueSpec {
            KeyOrValueSpec() {
            }

            abstract DataFormat dataFormat();

            static final class Json
            extends KeyOrValueSpec {
                final String[] includeColumns;
                final Predicate<String> excludeColumns;
                final Map<String, String> columnNameToFieldName;
                final String nestedObjectDelimiter;
                final boolean outputNulls;
                final String timestampFieldName;

                Json(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnNameToFieldName, String nestedObjectDelimiter, boolean outputNulls, String timestampFieldName) {
                    this.includeColumns = includeColumns;
                    this.excludeColumns = excludeColumns;
                    this.columnNameToFieldName = columnNameToFieldName;
                    this.nestedObjectDelimiter = nestedObjectDelimiter;
                    this.outputNulls = outputNulls;
                    this.timestampFieldName = timestampFieldName;
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.JSON;
                }

                String[] getColumnNames(Table t) {
                    if (this.excludeColumns != null && this.includeColumns != null) {
                        throw new IllegalArgumentException("Can't have both excludeColumns and includeColumns not null");
                    }
                    String[] tableColumnNames = t.getDefinition().getColumnNamesArray();
                    if (this.excludeColumns == null && this.includeColumns == null) {
                        return tableColumnNames;
                    }
                    HashSet<String> tableColumnsSet = new HashSet<String>(Arrays.asList(tableColumnNames));
                    if (this.includeColumns != null) {
                        List missing = Arrays.stream(this.includeColumns).filter(cn -> !tableColumnsSet.contains(cn)).collect(Collectors.toList());
                        if (missing.size() > 0) {
                            throw new IllegalArgumentException("includeColumns contains names not found in table columns: " + missing);
                        }
                        return this.includeColumns;
                    }
                    return (String[])Arrays.stream(tableColumnNames).filter(cn -> !this.excludeColumns.test((String)cn)).toArray(String[]::new);
                }

                String[] getFieldNames(String[] columnNames) {
                    String[] fieldNames = new String[columnNames.length];
                    for (int i = 0; i < columnNames.length; ++i) {
                        fieldNames[i] = this.columnNameToFieldName == null ? columnNames[i] : this.columnNameToFieldName.getOrDefault(columnNames[i], columnNames[i]);
                    }
                    return fieldNames;
                }
            }

            static final class Avro
            extends KeyOrValueSpec {
                Schema schema;
                final String schemaName;
                final String schemaVersion;
                final Map<String, String> fieldToColumnMapping;
                final String timestampFieldName;
                final Predicate<String> includeOnlyColumns;
                final Predicate<String> excludeColumns;
                final boolean publishSchema;
                final String schemaNamespace;
                final MutableObject<Properties> columnProperties;

                Avro(Schema schema, String schemaName, String schemaVersion, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns, boolean publishSchema, String schemaNamespace, Properties columnProperties) {
                    this.schema = schema;
                    this.schemaName = schemaName;
                    this.schemaVersion = schemaVersion;
                    this.fieldToColumnMapping = fieldToColumnMapping;
                    this.timestampFieldName = timestampFieldName;
                    this.includeOnlyColumns = includeOnlyColumns;
                    this.excludeColumns = excludeColumns;
                    this.publishSchema = publishSchema;
                    this.schemaNamespace = schemaNamespace;
                    this.columnProperties = new MutableObject((Object)columnProperties);
                    if (publishSchema && schemaVersion != null && !KafkaTools.AVRO_LATEST_VERSION.equals(schemaVersion)) {
                        throw new IllegalArgumentException(String.format("schemaVersion must be null or \"%s\" when publishSchema=true", KafkaTools.AVRO_LATEST_VERSION));
                    }
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.AVRO;
                }

                void ensureSchema(Table t, Properties kafkaProperties) {
                    if (this.schema != null) {
                        return;
                    }
                    if (this.publishSchema) {
                        this.schema = KafkaTools.columnDefinitionsToAvroSchema(t, this.schemaName, this.schemaNamespace, (Properties)this.columnProperties.getValue(), this.includeOnlyColumns, this.excludeColumns, this.columnProperties);
                        try {
                            KafkaTools.putAvroSchema(kafkaProperties, this.schemaName, this.schema);
                        }
                        catch (RestClientException | IOException e) {
                            throw new UncheckedDeephavenException(e);
                        }
                    } else {
                        this.schema = KafkaTools.getAvroSchema(kafkaProperties, this.schemaName, this.schemaVersion);
                    }
                }

                String[] getColumnNames(Table t, Properties kafkaProperties) {
                    this.ensureSchema(t, kafkaProperties);
                    List fields = this.schema.getFields();
                    if (this.timestampFieldName != null) {
                        boolean found = false;
                        for (Schema.Field field : fields) {
                            String fieldName = field.name();
                            if (!fieldName.equals(this.timestampFieldName)) continue;
                            found = true;
                            break;
                        }
                        if (!found) {
                            throw new IllegalArgumentException("timestampFieldName=" + this.timestampFieldName + " is not a field name in the provided schema.");
                        }
                    }
                    boolean timestampFieldCount = this.timestampFieldName != null;
                    ArrayList<String> columnNames = new ArrayList<String>();
                    for (Schema.Field field : fields) {
                        String fieldName = field.name();
                        if (this.timestampFieldName != null && fieldName.equals(this.timestampFieldName)) continue;
                        String candidateColumnName = this.fieldToColumnMapping == null ? fieldName : this.fieldToColumnMapping.getOrDefault(fieldName, fieldName);
                        if (this.excludeColumns != null && this.excludeColumns.test(candidateColumnName) || this.includeOnlyColumns != null && !this.includeOnlyColumns.test(candidateColumnName)) continue;
                        columnNames.add(candidateColumnName);
                    }
                    return columnNames.toArray(new String[columnNames.size()]);
                }
            }

            static final class Simple
            extends KeyOrValueSpec {
                final String columnName;

                Simple(String columnName) {
                    this.columnName = columnName;
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.SIMPLE;
                }
            }

            static final class Ignore
            extends KeyOrValueSpec {
                Ignore() {
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.IGNORE;
                }
            }
        }
    }

    public static class Consume {
        public static final KeyOrValueSpec.Ignore IGNORE = new KeyOrValueSpec.Ignore();

        public static KeyOrValueSpec ignoreSpec() {
            return IGNORE;
        }

        public static KeyOrValueSpec jsonSpec(ColumnDefinition<?>[] columnDefinitions, Map<String, String> fieldToColumnName) {
            return new KeyOrValueSpec.Json(columnDefinitions, fieldToColumnName);
        }

        public static KeyOrValueSpec jsonSpec(ColumnDefinition<?>[] columnDefinitions) {
            return Consume.jsonSpec(columnDefinitions, null);
        }

        public static KeyOrValueSpec avroSpec(Schema schema, Function<String, String> fieldNameToColumnName) {
            return new KeyOrValueSpec.Avro(schema, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(Schema schema) {
            return new KeyOrValueSpec.Avro(schema, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion, Function<String, String> fieldNameToColumnName) {
            return new KeyOrValueSpec.Avro(schemaName, schemaVersion, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, Function<String, String> fieldNameToColumnName) {
            return new KeyOrValueSpec.Avro(schemaName, KafkaTools.AVRO_LATEST_VERSION, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion) {
            return new KeyOrValueSpec.Avro(schemaName, schemaVersion, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec avroSpec(String schemaName) {
            return new KeyOrValueSpec.Avro(schemaName, KafkaTools.AVRO_LATEST_VERSION, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec simpleSpec(String columnName, Class<?> dataType) {
            return new KeyOrValueSpec.Simple(columnName, dataType);
        }

        public static KeyOrValueSpec simpleSpec(String columnName) {
            return new KeyOrValueSpec.Simple(columnName, null);
        }

        static abstract class KeyOrValueSpec {
            private static final Simple FROM_PROPERTIES = new Simple(null, null);

            KeyOrValueSpec() {
            }

            abstract DataFormat dataFormat();

            static final class Json
            extends KeyOrValueSpec {
                final ColumnDefinition<?>[] columnDefinitions;
                final Map<String, String> fieldToColumnName;

                private Json(ColumnDefinition<?>[] columnDefinitions, Map<String, String> fieldNameToColumnName) {
                    this.columnDefinitions = columnDefinitions;
                    this.fieldToColumnName = Json.mapNonPointers(fieldNameToColumnName);
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.JSON;
                }

                private static Map<String, String> mapNonPointers(Map<String, String> fieldNameToColumnName) {
                    if (fieldNameToColumnName == null) {
                        return null;
                    }
                    boolean needsMapping = fieldNameToColumnName.keySet().stream().anyMatch(key -> !key.startsWith("/"));
                    if (!needsMapping) {
                        return fieldNameToColumnName;
                    }
                    HashMap<String, String> ans = new HashMap<String, String>(fieldNameToColumnName.size());
                    for (Map.Entry<String, String> entry : fieldNameToColumnName.entrySet()) {
                        String key2 = entry.getKey();
                        if (key2.startsWith("/")) {
                            ans.put(key2, entry.getValue());
                            continue;
                        }
                        ans.put(Json.mapFieldNameToJsonPointerStr(key2), entry.getValue());
                    }
                    return ans;
                }

                public static String mapFieldNameToJsonPointerStr(String key) {
                    return "/" + key.replace("~", "~0").replace("/", "~1");
                }
            }

            static final class Simple
            extends KeyOrValueSpec {
                final String columnName;
                final Class<?> dataType;

                private Simple(String columnName, Class<?> dataType) {
                    this.columnName = columnName;
                    this.dataType = dataType;
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.SIMPLE;
                }
            }

            static final class Avro
            extends KeyOrValueSpec {
                final Schema schema;
                final String schemaName;
                final String schemaVersion;
                final Function<String, String> fieldPathToColumnName;

                private Avro(Schema schema, Function<String, String> fieldPathToColumnName) {
                    this.schema = schema;
                    this.schemaName = null;
                    this.schemaVersion = null;
                    this.fieldPathToColumnName = fieldPathToColumnName;
                }

                private Avro(String schemaName, String schemaVersion, Function<String, String> fieldPathToColumnName) {
                    this.schema = null;
                    this.schemaName = schemaName;
                    this.schemaVersion = schemaVersion;
                    this.fieldPathToColumnName = fieldPathToColumnName;
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.AVRO;
                }
            }

            static final class Ignore
            extends KeyOrValueSpec {
                Ignore() {
                }

                @Override
                DataFormat dataFormat() {
                    return DataFormat.IGNORE;
                }
            }
        }
    }

    public static enum DataFormat {
        IGNORE,
        SIMPLE,
        AVRO,
        JSON;

    }

    public static enum KeyOrValue {
        KEY,
        VALUE;

    }
}

