/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.kafka.KafkaSchemaUtils;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.kafka.ingest.GenericRecordChunkAdapter;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.publish.GenericRecordKeyOrValueSerializer;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.qst.type.Type;
import io.deephaven.stream.StreamChunkUtils;
import io.deephaven.util.mutable.MutableInt;
import io.deephaven.vector.ByteVector;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;

class AvroImpl {
    private static final Type<Utf8> utf8Type = Type.find(Utf8.class);

    AvroImpl() {
    }

    static Schema getAvroSchema(SchemaRegistryClient schemaClient, String schemaName, String schemaVersion) {
        try {
            SchemaMetadata schemaMetadata = "latest".equals(schemaVersion) ? schemaClient.getLatestSchemaMetadata(schemaName) : schemaClient.getSchemaMetadata(schemaName, Integer.parseInt(schemaVersion));
            return (Schema)schemaClient.getSchemaById(schemaMetadata.getId()).rawSchema();
        }
        catch (RestClientException | IOException e) {
            throw new UncheckedDeephavenException(e);
        }
    }

    static Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, MutableObject<Properties> colPropsOut) {
        SchemaBuilder.FieldAssembler<Schema> fass = ((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)schemaName).namespace(namespace)).fields();
        List colDefs = t.getDefinition().getColumns();
        colPropsOut.setValue((Object)colProps);
        for (ColumnDefinition colDef : colDefs) {
            if (includeOnly != null && !includeOnly.test(colDef.getName()) || exclude != null && exclude.test(colDef.getName())) continue;
            fass = AvroImpl.addFieldForColDef(t, fass, colDef, colPropsOut);
        }
        return (Schema)fass.endRecord();
    }

    private static SchemaBuilder.FieldAssembler<Schema> addFieldForColDef(Table t, SchemaBuilder.FieldAssembler<Schema> fassIn, ColumnDefinition<?> colDef, MutableObject<Properties> colPropsMu) {
        String logicalTypeName = "logicalType";
        String dhTypeAttribute = "dhType";
        SchemaBuilder.FieldAssembler fass = fassIn;
        Class type = colDef.getDataType();
        String colName = colDef.getName();
        SchemaBuilder.BaseFieldTypeBuilder base = fass.name(colName).type().nullable();
        if (type == Byte.TYPE || type == Character.TYPE || type == Short.TYPE) {
            fass = ((SchemaBuilder.IntDefault)((SchemaBuilder.IntBuilder)base.intBuilder().prop("dhType", type.getName())).endInt()).noDefault();
        } else if (type == Integer.TYPE) {
            fass = base.intType().noDefault();
        } else if (type == Long.TYPE) {
            fass = base.longType().noDefault();
        } else if (type == Float.TYPE) {
            fass = base.floatType().noDefault();
        } else if (type == Double.TYPE) {
            fass = base.doubleType().noDefault();
        } else if (type == String.class) {
            fass = base.stringType().noDefault();
        } else if (type == Instant.class) {
            fass = ((SchemaBuilder.LongDefault)((SchemaBuilder.LongBuilder)base.longBuilder().prop("logicalType", "timestamp-micros")).endLong()).noDefault();
        } else if (type == BigDecimal.class) {
            BigDecimalUtils.PropertyNames propertyNames = new BigDecimalUtils.PropertyNames(colName);
            BigDecimalUtils.PrecisionAndScale values = BigDecimalUtils.getPrecisionAndScaleFromColumnProperties((BigDecimalUtils.PropertyNames)propertyNames, (Properties)((Properties)colPropsMu.getValue()), (boolean)true);
            if (t.isRefreshing()) {
                AvroImpl.validatePrecisionAndScaleForRefreshingTable(propertyNames, values);
            } else {
                AvroImpl.ensurePrecisionAndScaleForStaticTable(colPropsMu, t, propertyNames, values);
            }
            fass = ((SchemaBuilder.BytesDefault)((SchemaBuilder.BytesBuilder)((SchemaBuilder.BytesBuilder)((SchemaBuilder.BytesBuilder)base.bytesBuilder().prop("logicalType", "decimal")).prop("precision", (Object)values.precision)).prop("scale", (Object)values.scale)).endBytes()).noDefault();
        } else {
            fass = ((SchemaBuilder.BytesDefault)((SchemaBuilder.BytesBuilder)base.bytesBuilder().prop("dhType", type.getName())).endBytes()).noDefault();
        }
        return fass;
    }

    private static void validatePrecisionAndScaleForRefreshingTable(BigDecimalUtils.PropertyNames names, BigDecimalUtils.PrecisionAndScale values) {
        String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + " in a refreshing table implies both properties '" + names.precisionProperty + "' and '" + names.scaleProperty + "' should be defined; ";
        if (values.precision == -1 && values.scale == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing both");
        }
        if (values.precision == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing '" + names.precisionProperty + "'");
        }
        if (values.scale == -1) {
            throw new IllegalArgumentException(exBaseMsg + " missing '" + names.scaleProperty + "'");
        }
    }

    private static BigDecimalUtils.PrecisionAndScale ensurePrecisionAndScaleForStaticTable(MutableObject<Properties> colPropsMu, Table t, BigDecimalUtils.PropertyNames names, BigDecimalUtils.PrecisionAndScale valuesIn) {
        Properties toSet;
        if (valuesIn.precision != -1 && valuesIn.scale != -1) {
            return valuesIn;
        }
        String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + " in a non refreshing table implies either both properties '" + names.precisionProperty + "' and '" + names.scaleProperty + "' should be defined, or none of them;";
        if (valuesIn.precision != -1) {
            throw new IllegalArgumentException(exBaseMsg + " only '" + names.precisionProperty + "' is defined, missing '" + names.scaleProperty + "'");
        }
        if (valuesIn.scale != -1) {
            throw new IllegalArgumentException(exBaseMsg + " only '" + names.scaleProperty + "' is defined, missing '" + names.precisionProperty + "'");
        }
        BigDecimalUtils.PrecisionAndScale newValues = BigDecimalUtils.computePrecisionAndScale((Table)t, (String)names.columnName);
        Properties colProps = (Properties)colPropsMu.getValue();
        if (colProps == null) {
            toSet = new Properties();
            colPropsMu.setValue((Object)toSet);
        } else {
            toSet = colProps;
        }
        BigDecimalUtils.setProperties((Properties)toSet, (BigDecimalUtils.PropertyNames)names, (BigDecimalUtils.PrecisionAndScale)newValues);
        return newValues;
    }

    static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, Schema schema, Function<String, String> requestedFieldPathToColumnName, boolean useUTF8Strings) {
        if (schema.isUnion()) {
            throw new UnsupportedOperationException("Schemas defined as a union of records are not supported");
        }
        Schema.Type type = schema.getType();
        if (type != Schema.Type.RECORD) {
            throw new IllegalArgumentException("The schema is not a toplevel record definition.");
        }
        List fields = schema.getFields();
        for (Schema.Field field : fields) {
            AvroImpl.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, "", field, requestedFieldPathToColumnName, useUTF8Strings);
        }
    }

    private static void pushColumnTypesFromAvroField(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, String fieldNamePrefix, Schema.Field field, Function<String, String> fieldPathToColumnName, boolean useUTF8Strings) {
        Schema fieldSchema = field.schema();
        String fieldName = field.name();
        String mappedNameForColumn = fieldPathToColumnName.apply(fieldNamePrefix + fieldName);
        if (mappedNameForColumn == null) {
            return;
        }
        Schema.Type fieldType = fieldSchema.getType();
        AvroImpl.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix, fieldName, fieldSchema, mappedNameForColumn, fieldType, fieldPathToColumnName, useUTF8Strings);
    }

    private static void pushColumnTypesFromAvroField(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, String fieldNamePrefix, String fieldName, Schema fieldSchema, String mappedNameForColumn, Schema.Type fieldType, Function<String, String> fieldPathToColumnName, boolean useUTF8Strings) {
        block0 : switch (fieldType) {
            case BOOLEAN: {
                columnsOut.add(ColumnDefinition.ofBoolean((String)mappedNameForColumn));
                break;
            }
            case INT: {
                columnsOut.add(ColumnDefinition.ofInt((String)mappedNameForColumn));
                break;
            }
            case LONG: {
                LogicalType logicalType = AvroImpl.getEffectiveLogicalType(fieldName, fieldSchema);
                if (LogicalTypes.timestampMicros().equals(logicalType) || LogicalTypes.timestampMillis().equals(logicalType)) {
                    columnsOut.add(ColumnDefinition.ofTime((String)mappedNameForColumn));
                    break;
                }
                columnsOut.add(ColumnDefinition.ofLong((String)mappedNameForColumn));
                break;
            }
            case FLOAT: {
                columnsOut.add(ColumnDefinition.ofFloat((String)mappedNameForColumn));
                break;
            }
            case DOUBLE: {
                columnsOut.add(ColumnDefinition.ofDouble((String)mappedNameForColumn));
                break;
            }
            case ENUM: 
            case STRING: {
                if (useUTF8Strings) {
                    columnsOut.add(ColumnDefinition.of((String)mappedNameForColumn, utf8Type));
                    break;
                }
                columnsOut.add(ColumnDefinition.ofString((String)mappedNameForColumn));
                break;
            }
            case UNION: {
                Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema);
                if (effectiveSchema == fieldSchema) {
                    columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, GenericRecord.class));
                    break;
                }
                AvroImpl.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix, fieldName, effectiveSchema, mappedNameForColumn, effectiveSchema.getType(), fieldPathToColumnName, useUTF8Strings);
                return;
            }
            case RECORD: {
                for (Schema.Field nestedField : fieldSchema.getFields()) {
                    AvroImpl.pushColumnTypesFromAvroField(columnsOut, fieldPathToColumnNameOut, fieldNamePrefix + fieldName + ".", nestedField, fieldPathToColumnName, useUTF8Strings);
                }
                return;
            }
            case BYTES: 
            case FIXED: {
                LogicalType logicalType = AvroImpl.getEffectiveLogicalType(fieldName, fieldSchema);
                if (logicalType instanceof LogicalTypes.Decimal) {
                    columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, BigDecimal.class));
                    break;
                }
                columnsOut.add(ColumnDefinition.ofVector((String)mappedNameForColumn, ByteVector.class));
                break;
            }
            case ARRAY: {
                Schema elementTypeSchema = fieldSchema.getElementType();
                Schema.Type elementTypeType = elementTypeSchema.getType();
                if (elementTypeType.equals((Object)Schema.Type.UNION)) {
                    elementTypeSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, elementTypeSchema);
                    elementTypeType = elementTypeSchema.getType();
                }
                switch (elementTypeType) {
                    case INT: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, int[].class));
                        break block0;
                    }
                    case LONG: {
                        LogicalType logicalType = AvroImpl.getEffectiveLogicalType(fieldName, elementTypeSchema);
                        if (LogicalTypes.timestampMicros().equals(logicalType) || LogicalTypes.timestampMillis().equals(logicalType)) {
                            columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Instant[].class));
                            break block0;
                        }
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, long[].class));
                        break block0;
                    }
                    case FLOAT: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, float[].class));
                        break block0;
                    }
                    case DOUBLE: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, double[].class));
                        break block0;
                    }
                    case BOOLEAN: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Boolean[].class));
                        break block0;
                    }
                    case ENUM: 
                    case STRING: {
                        columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, String[].class));
                        break block0;
                    }
                }
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Object[].class));
                break;
            }
            case MAP: {
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, Map.class));
                break;
            }
            default: {
                columnsOut.add(ColumnDefinition.fromGenericType((String)mappedNameForColumn, GenericContainer.class));
            }
        }
        if (fieldPathToColumnNameOut != null) {
            fieldPathToColumnNameOut.put(fieldNamePrefix + fieldName, mappedNameForColumn);
        }
    }

    private static LogicalType getEffectiveLogicalType(String fieldName, Schema fieldSchema) {
        Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema);
        return effectiveSchema.getLogicalType();
    }

    static final class AvroProduce
    extends KafkaTools.Produce.KeyOrValueSpec {
        private Schema schema;
        private final String schemaName;
        private final String schemaVersion;
        final Map<String, String> fieldToColumnMapping;
        private final String timestampFieldName;
        private final Predicate<String> includeOnlyColumns;
        private final Predicate<String> excludeColumns;
        private final boolean publishSchema;
        private final String schemaNamespace;
        private final MutableObject<Properties> columnProperties;

        AvroProduce(Schema schema, String schemaName, String schemaVersion, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns, boolean publishSchema, String schemaNamespace, Properties columnProperties) {
            this.schema = schema;
            this.schemaName = schemaName;
            this.schemaVersion = schemaVersion;
            this.fieldToColumnMapping = fieldToColumnMapping;
            this.timestampFieldName = timestampFieldName;
            this.includeOnlyColumns = includeOnlyColumns;
            this.excludeColumns = excludeColumns;
            this.publishSchema = publishSchema;
            this.schemaNamespace = schemaNamespace;
            this.columnProperties = new MutableObject((Object)columnProperties);
            if (publishSchema && schemaVersion != null && !"latest".equals(schemaVersion)) {
                throw new IllegalArgumentException(String.format("schemaVersion must be null or \"%s\" when publishSchema=true", "latest"));
            }
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.of(new AvroSchemaProvider());
        }

        @Override
        Serializer<?> getSerializer(SchemaRegistryClient schemaRegistryClient, TableDefinition definition) {
            return new KafkaAvroSerializer(Objects.requireNonNull(schemaRegistryClient));
        }

        @Override
        String[] getColumnNames(@NotNull Table t, SchemaRegistryClient schemaRegistryClient) {
            this.ensureSchema(t, schemaRegistryClient);
            List fields = this.schema.getFields();
            if (this.timestampFieldName != null) {
                boolean found = false;
                for (Schema.Field field : fields) {
                    String fieldName = field.name();
                    if (!fieldName.equals(this.timestampFieldName)) continue;
                    found = true;
                    break;
                }
                if (!found) {
                    throw new IllegalArgumentException("timestampFieldName=" + this.timestampFieldName + " is not a field name in the provided schema.");
                }
            }
            boolean timestampFieldCount = this.timestampFieldName != null;
            ArrayList<String> columnNames = new ArrayList<String>();
            for (Schema.Field field : fields) {
                String fieldName = field.name();
                if (fieldName.equals(this.timestampFieldName)) continue;
                String candidateColumnName = this.fieldToColumnMapping == null ? fieldName : this.fieldToColumnMapping.getOrDefault(fieldName, fieldName);
                if (this.excludeColumns != null && this.excludeColumns.test(candidateColumnName) || this.includeOnlyColumns != null && !this.includeOnlyColumns.test(candidateColumnName)) continue;
                columnNames.add(candidateColumnName);
            }
            return columnNames.toArray(new String[columnNames.size()]);
        }

        @Override
        KeyOrValueSerializer<?> getKeyOrValueSerializer(@NotNull Table t, @NotNull String[] columnNames) {
            return new GenericRecordKeyOrValueSerializer(t, this.schema, columnNames, this.timestampFieldName, (Properties)this.columnProperties.getValue());
        }

        void ensureSchema(Table t, SchemaRegistryClient schemaRegistryClient) {
            if (this.schema != null) {
                return;
            }
            if (this.publishSchema) {
                this.schema = AvroImpl.columnDefinitionsToAvroSchema(t, this.schemaName, this.schemaNamespace, (Properties)this.columnProperties.getValue(), this.includeOnlyColumns, this.excludeColumns, this.columnProperties);
                try {
                    schemaRegistryClient.register(this.schemaName, (ParsedSchema)new AvroSchema(this.schema));
                }
                catch (RestClientException | IOException e) {
                    throw new UncheckedDeephavenException(e);
                }
            } else {
                this.schema = AvroImpl.getAvroSchema(schemaRegistryClient, this.schemaName, this.schemaVersion);
            }
        }
    }

    static final class AvroConsume
    extends KafkaTools.Consume.KeyOrValueSpec {
        private static final Pattern NESTED_FIELD_NAME_SEPARATOR_PATTERN = Pattern.compile(Pattern.quote("."));
        private Schema schema;
        private final String schemaName;
        private final String schemaVersion;
        private final Function<String, String> fieldPathToColumnName;
        private final boolean useUTF8Strings;

        AvroConsume(Schema schema, Function<String, String> fieldPathToColumnName) {
            this.schema = Objects.requireNonNull(schema);
            this.schemaName = null;
            this.schemaVersion = null;
            this.fieldPathToColumnName = fieldPathToColumnName;
            this.useUTF8Strings = false;
        }

        AvroConsume(String schemaName, String schemaVersion, Function<String, String> fieldPathToColumnName) {
            this(schemaName, schemaVersion, fieldPathToColumnName, false);
        }

        AvroConsume(String schemaName, String schemaVersion, Function<String, String> fieldPathToColumnName, boolean useUTF8Strings) {
            this.schema = null;
            this.schemaName = schemaName;
            this.schemaVersion = schemaVersion;
            this.fieldPathToColumnName = fieldPathToColumnName;
            this.useUTF8Strings = useUTF8Strings;
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.of(new AvroSchemaProvider());
        }

        @Override
        protected Deserializer<?> getDeserializer(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs) {
            this.ensureSchema(schemaRegistryClient);
            return new KafkaAvroDeserializerWithReaderSchema(schemaRegistryClient);
        }

        @Override
        protected KafkaTools.KeyOrValueIngestData getIngestData(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
            this.ensureSchema(schemaRegistryClient);
            KafkaTools.KeyOrValueIngestData data = new KafkaTools.KeyOrValueIngestData();
            data.fieldPathToColumnName = new HashMap<String, String>();
            AvroImpl.avroSchemaToColumnDefinitions(columnDefinitionsOut, data.fieldPathToColumnName, this.schema, this.fieldPathToColumnName, this.useUTF8Strings);
            data.extra = this.schema;
            return data;
        }

        @Override
        protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KafkaTools.KeyOrValueIngestData data) {
            return GenericRecordChunkAdapter.make(tableDef, ci -> StreamChunkUtils.chunkTypeForColumnIndex((TableDefinition)tableDef, (int)ci), data.fieldPathToColumnName, NESTED_FIELD_NAME_SEPARATOR_PATTERN, (Schema)data.extra, true);
        }

        private void ensureSchema(SchemaRegistryClient schemaRegistryClient) {
            if (this.schema != null) {
                return;
            }
            this.schema = Objects.requireNonNull(AvroImpl.getAvroSchema(schemaRegistryClient, this.schemaName, this.schemaVersion));
        }

        class KafkaAvroDeserializerWithReaderSchema
        extends KafkaAvroDeserializer {
            public KafkaAvroDeserializerWithReaderSchema(SchemaRegistryClient client) {
                super(client);
            }

            public Object deserialize(String topic, byte[] bytes) {
                return super.deserialize(topic, bytes, AvroConsume.this.schema);
            }

            public Object deserialize(String topic, Headers headers, byte[] bytes) {
                return super.deserialize(topic, headers, bytes, AvroConsume.this.schema);
            }
        }
    }
}

