/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.kafka.publish.SimpleKeyOrValueSerializer;
import io.deephaven.qst.type.ArrayType;
import io.deephaven.qst.type.BooleanType;
import io.deephaven.qst.type.BoxedType;
import io.deephaven.qst.type.ByteType;
import io.deephaven.qst.type.CharType;
import io.deephaven.qst.type.CustomType;
import io.deephaven.qst.type.DoubleType;
import io.deephaven.qst.type.FloatType;
import io.deephaven.qst.type.GenericType;
import io.deephaven.qst.type.InstantType;
import io.deephaven.qst.type.IntType;
import io.deephaven.qst.type.LongType;
import io.deephaven.qst.type.PrimitiveType;
import io.deephaven.qst.type.ShortType;
import io.deephaven.qst.type.StringType;
import io.deephaven.qst.type.Type;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.mutable.MutableInt;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.jetbrains.annotations.NotNull;

class SimpleImpl {
    @VisibleForTesting
    static final Map<String, Type<?>> DESER_NAME_TO_TYPE = Map.of(ShortDeserializer.class.getName(), Type.shortType(), IntegerDeserializer.class.getName(), Type.intType(), LongDeserializer.class.getName(), Type.longType(), FloatDeserializer.class.getName(), Type.floatType(), DoubleDeserializer.class.getName(), Type.doubleType(), ByteArrayDeserializer.class.getName(), Type.byteType().arrayType(), UUIDDeserializer.class.getName(), Type.ofCustom(UUID.class), ByteBufferDeserializer.class.getName(), Type.ofCustom(ByteBuffer.class), BytesDeserializer.class.getName(), Type.ofCustom(Bytes.class));

    SimpleImpl() {
    }

    @VisibleForTesting
    static Optional<Serializer<?>> serializer(Type<?> type) {
        return Optional.ofNullable((Serde)type.walk((Type.Visitor)SerDeserVisitor.INSTANCE)).map(Serde::serializer);
    }

    @VisibleForTesting
    static Optional<Deserializer<?>> deserializer(Type<?> type) {
        return Optional.ofNullable((Serde)type.walk((Type.Visitor)SerDeserVisitor.INSTANCE)).map(Serde::deserializer);
    }

    private static enum SerDeserVisitor implements Type.Visitor<Serde<?>>,
    PrimitiveType.Visitor<Serde<?>>,
    GenericType.Visitor<Serde<?>>
    {
        INSTANCE;


        public Serde<?> visit(PrimitiveType<?> primitiveType) {
            return (Serde)primitiveType.walk((PrimitiveType.Visitor)this);
        }

        public Serde<?> visit(GenericType<?> genericType) {
            return (Serde)genericType.walk((GenericType.Visitor)this);
        }

        public Serde<?> visit(BooleanType booleanType) {
            return null;
        }

        public Serde<?> visit(ByteType byteType) {
            return null;
        }

        public Serde<?> visit(CharType charType) {
            return null;
        }

        public Serde<?> visit(ShortType shortType) {
            return Serdes.Short();
        }

        public Serde<?> visit(IntType intType) {
            return Serdes.Integer();
        }

        public Serde<?> visit(LongType longType) {
            return Serdes.Long();
        }

        public Serde<?> visit(FloatType floatType) {
            return Serdes.Float();
        }

        public Serde<?> visit(DoubleType doubleType) {
            return Serdes.Double();
        }

        public Serde<?> visit(BoxedType<?> boxedType) {
            return (Serde)boxedType.primitiveType().walk((PrimitiveType.Visitor)this);
        }

        public Serde<?> visit(StringType stringType) {
            return Serdes.String();
        }

        public Serde<?> visit(InstantType instantType) {
            return null;
        }

        public Serde<?> visit(ArrayType<?, ?> arrayType) {
            if (Type.byteType().arrayType().equals(arrayType)) {
                return Serdes.ByteArray();
            }
            return null;
        }

        public Serde<?> visit(CustomType<?> customType) {
            if (customType.clazz() == UUID.class) {
                return Serdes.UUID();
            }
            if (customType.clazz() == ByteBuffer.class) {
                return Serdes.ByteBuffer();
            }
            if (customType.clazz() == Bytes.class) {
                return Serdes.Bytes();
            }
            return null;
        }
    }

    static final class SimpleProduce
    extends KafkaTools.Produce.KeyOrValueSpec {
        private final String columnName;

        SimpleProduce(String columnName) {
            this.columnName = columnName;
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.empty();
        }

        @Override
        Serializer<?> getSerializer(SchemaRegistryClient schemaRegistryClient, TableDefinition definition) {
            Class dataType = definition.getColumn(this.columnName).getDataType();
            Serializer serializer = SimpleImpl.serializer(Type.find((Class)dataType)).orElse(null);
            if (serializer != null) {
                return serializer;
            }
            throw new UncheckedDeephavenException(String.format("Serializer not found for column %s, type %s", this.columnName, dataType.getName()));
        }

        @Override
        String[] getColumnNames(@NotNull Table t, SchemaRegistryClient schemaRegistryClient) {
            return new String[]{this.columnName};
        }

        @Override
        KeyOrValueSerializer<?> getKeyOrValueSerializer(@NotNull Table t, @NotNull String[] columnNames) {
            return new SimpleKeyOrValueSerializer(t, this.columnName);
        }
    }

    static final class SimpleConsume
    extends KafkaTools.Consume.KeyOrValueSpec {
        private final String columnName;
        private final Class<?> dataType;

        SimpleConsume(String columnName, Class<?> dataType) {
            this.columnName = columnName;
            this.dataType = dataType;
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return Optional.empty();
        }

        @Override
        protected Deserializer<?> getDeserializer(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs) {
            Type<?> type = this.getType(keyOrValue, configs);
            Deserializer deserializer = SimpleImpl.deserializer(type).orElse(null);
            if (deserializer != null) {
                return deserializer;
            }
            throw new UncheckedDeephavenException(String.format("Deserializer for %s not set in kafka consumer properties and can't automatically set it for type %s", this, this.dataType));
        }

        @Override
        protected KafkaTools.KeyOrValueIngestData getIngestData(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
            KafkaTools.KeyOrValueIngestData data = new KafkaTools.KeyOrValueIngestData();
            data.simpleColumnIndex = nextColumnIndexMut.getAndIncrement();
            ColumnDefinition colDef = ColumnDefinition.of((String)this.getColumnName(keyOrValue, configs), this.getType(keyOrValue, configs));
            columnDefinitionsOut.add(colDef);
            return data;
        }

        @Override
        protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KafkaTools.KeyOrValueIngestData data) {
            return null;
        }

        String getColumnName(KafkaTools.KeyOrValue keyOrValue, Map<String, ?> configs) {
            String nameProperty;
            if (this.columnName != null) {
                return this.columnName;
            }
            String string = nameProperty = keyOrValue == KafkaTools.KeyOrValue.KEY ? "deephaven.key.column.name" : "deephaven.value.column.name";
            if (configs.containsKey(nameProperty)) {
                return (String)configs.get(nameProperty);
            }
            return keyOrValue == KafkaTools.KeyOrValue.KEY ? "KafkaKey" : "KafkaValue";
        }

        @VisibleForTesting
        Type<?> getType(KafkaTools.KeyOrValue keyOrValue, Map<String, ?> configs) {
            if (this.dataType != null) {
                return Type.find(this.dataType);
            }
            Type<?> typeFromProperty = this.getTypeFromDhProperty(keyOrValue, configs);
            if (typeFromProperty != null) {
                return typeFromProperty;
            }
            Type<?> typeFromDeserializer = this.getTypeFromDeserializerProperty(keyOrValue, configs);
            if (typeFromDeserializer != null) {
                return typeFromDeserializer;
            }
            String columnName = this.getColumnName(keyOrValue, configs);
            String specName = keyOrValue == KafkaTools.KeyOrValue.KEY ? "key_spec" : "value_spec";
            String dhProperty = SimpleConsume.dhProperty(keyOrValue);
            String kafkaDeserializerProperty = SimpleConsume.kafkaDeserializerProperty(keyOrValue);
            throw new UncheckedDeephavenException(String.format("Unable to find the type for column '%s' (%s). Please explicitly set the data type in the constructor, or through the kafka configuration '%s' or '%s'.", columnName, specName, dhProperty, kafkaDeserializerProperty));
        }

        private Type<?> getTypeFromDhProperty(KafkaTools.KeyOrValue keyOrValue, Map<String, ?> configs) {
            String typeAsString;
            String typeProperty = SimpleConsume.dhProperty(keyOrValue);
            if (!configs.containsKey(typeProperty)) {
                return null;
            }
            switch (typeAsString = (String)configs.get(typeProperty)) {
                case "short": {
                    return Type.shortType();
                }
                case "int": {
                    return Type.intType();
                }
                case "long": {
                    return Type.longType();
                }
                case "float": {
                    return Type.floatType();
                }
                case "double": {
                    return Type.doubleType();
                }
                case "byte[]": {
                    return Type.byteType().arrayType();
                }
                case "String": 
                case "string": {
                    return Type.stringType();
                }
            }
            throw new IllegalArgumentException(String.format("Property %s value %s not supported", typeProperty, typeAsString));
        }

        private Type<?> getTypeFromDeserializerProperty(KafkaTools.KeyOrValue keyOrValue, Map<String, ?> configs) {
            String deserializerProperty = SimpleConsume.kafkaDeserializerProperty(keyOrValue);
            String deserializer = (String)configs.get(deserializerProperty);
            if (deserializer == null) {
                return null;
            }
            Type<?> type = DESER_NAME_TO_TYPE.get(deserializer);
            if (type != null) {
                return type;
            }
            throw new IllegalArgumentException(String.format("Deserializer type %s for %s not supported.", deserializer, deserializerProperty));
        }

        private static String dhProperty(KafkaTools.KeyOrValue keyOrValue) {
            return keyOrValue == KafkaTools.KeyOrValue.KEY ? "deephaven.key.column.type" : "deephaven.value.column.type";
        }

        private static String kafkaDeserializerProperty(KafkaTools.KeyOrValue keyOrValue) {
            return keyOrValue == KafkaTools.KeyOrValue.KEY ? "key.deserializer" : "value.deserializer";
        }
    }
}

