/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.function.ToObjectFunction;
import io.deephaven.function.TypedFunction;
import io.deephaven.kafka.DhNullableTypeTransform;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.kafka.KeyOrValueProcessorImpl;
import io.deephaven.kafka.ProtobufDeserializers;
import io.deephaven.kafka.ToChunkTypeTransform;
import io.deephaven.kafka.UnboxTransform;
import io.deephaven.kafka.ingest.FieldCopier;
import io.deephaven.kafka.ingest.FieldCopierAdapter;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.ingest.MultiFieldChunkAdapter;
import io.deephaven.kafka.protobuf.DescriptorMessageClass;
import io.deephaven.kafka.protobuf.DescriptorProvider;
import io.deephaven.kafka.protobuf.DescriptorSchemaRegistry;
import io.deephaven.kafka.protobuf.ProtobufConsumeOptions;
import io.deephaven.protobuf.FieldPath;
import io.deephaven.protobuf.ProtobufDescriptorParser;
import io.deephaven.protobuf.ProtobufDescriptorParserOptions;
import io.deephaven.protobuf.ProtobufFunction;
import io.deephaven.protobuf.ProtobufFunctions;
import io.deephaven.qst.type.GenericType;
import io.deephaven.qst.type.Type;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.mutable.MutableInt;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.serialization.Deserializer;

class ProtobufImpl {
    ProtobufImpl() {
    }

    @VisibleForTesting
    static ProtobufFunctions simple(Descriptors.Descriptor descriptor, ProtobufDescriptorParserOptions options) {
        return ProtobufImpl.withMostAppropriateType(ProtobufDescriptorParser.parse((Descriptors.Descriptor)descriptor, (ProtobufDescriptorParserOptions)options));
    }

    private static ProtobufFunctions withMostAppropriateType(ProtobufFunctions functions) {
        ProtobufFunctions.Builder builder = ProtobufFunctions.builder();
        for (ProtobufFunction f : functions.functions()) {
            builder.addFunctions(ProtobufFunction.of((FieldPath)f.path(), ProtobufImpl.withMostAppropriateType(f.function())));
        }
        return builder.build();
    }

    private static <X> TypedFunction<X> withMostAppropriateType(TypedFunction<X> f) {
        TypedFunction<X> f2 = DhNullableTypeTransform.of(f);
        TypedFunction<X> unboxed = UnboxTransform.of(f2).orElse(null);
        return unboxed != null ? unboxed : f2;
    }

    private static Descriptors.Descriptor descriptor(Class<? extends Message> clazz) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Method getDescriptor = clazz.getMethod("getDescriptor", new Class[0]);
        return (Descriptors.Descriptor)getDescriptor.invoke(null, new Object[0]);
    }

    private static Descriptors.Descriptor descriptor(SchemaRegistryClient registry, DescriptorSchemaRegistry dsr) throws RestClientException, IOException {
        SchemaMetadata metadata;
        SchemaMetadata schemaMetadata = metadata = dsr.version().isPresent() ? registry.getSchemaMetadata(dsr.subject(), dsr.version().getAsInt()) : registry.getLatestSchemaMetadata(dsr.subject());
        if (!"PROTOBUF".equals(metadata.getSchemaType())) {
            throw new IllegalStateException(String.format("Expected schema type %s but was %s", "PROTOBUF", metadata.getSchemaType()));
        }
        ProtobufSchema protobufSchema = (ProtobufSchema)registry.getSchemaBySubjectAndId(dsr.subject(), metadata.getId());
        return dsr.messageName().isPresent() ? protobufSchema.toDescriptor(dsr.messageName().get()) : protobufSchema.toDescriptor();
    }

    private static <T extends Message> Parser<T> parser(Class<T> clazz) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Method parser = clazz.getMethod("parser", new Class[0]);
        return (Parser)parser.invoke(null, new Object[0]);
    }

    static final class ProtobufConsumeImpl
    extends KafkaTools.Consume.KeyOrValueSpec {
        private static final ToObjectFunction<Object, Message> PROTOBUF_MESSAGE_OBJ = ToObjectFunction.identity((GenericType)Type.ofCustom(Message.class));
        private final ProtobufConsumeOptions specs;
        private Descriptors.Descriptor descriptor;

        ProtobufConsumeImpl(ProtobufConsumeOptions specs) {
            this.specs = Objects.requireNonNull(specs);
        }

        @Override
        public Optional<SchemaProvider> getSchemaProvider() {
            return this.specs.descriptorProvider() instanceof DescriptorSchemaRegistry ? Optional.of(new ProtobufSchemaProvider()) : Optional.empty();
        }

        protected Deserializer<? extends Message> getDeserializer(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs) {
            DescriptorProvider dp = this.specs.descriptorProvider();
            if (dp instanceof DescriptorMessageClass) {
                this.setDescriptor((DescriptorMessageClass)dp);
                return this.deserializer((DescriptorMessageClass)dp);
            }
            if (dp instanceof DescriptorSchemaRegistry) {
                this.setDescriptor(schemaRegistryClient, (DescriptorSchemaRegistry)dp);
                return this.deserializer((DescriptorSchemaRegistry)dp);
            }
            throw new IllegalStateException("Unexpected descriptor provider: " + dp);
        }

        private void setDescriptor(DescriptorMessageClass<?> dmc) {
            try {
                this.descriptor = ProtobufImpl.descriptor(dmc.clazz());
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new UncheckedDeephavenException((Throwable)e);
            }
        }

        private void setDescriptor(SchemaRegistryClient schemaRegistryClient, DescriptorSchemaRegistry dsr) {
            try {
                this.descriptor = ProtobufImpl.descriptor(schemaRegistryClient, dsr);
            }
            catch (RestClientException | IOException e) {
                throw new UncheckedDeephavenException(e);
            }
        }

        private Deserializer<? extends Message> deserializer(DescriptorMessageClass<?> dmc) {
            Parser<?> parser;
            try {
                parser = ProtobufImpl.parser(dmc.clazz());
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                throw new UncheckedDeephavenException((Throwable)e);
            }
            return ProtobufDeserializers.of(this.specs.protocol(), parser);
        }

        private Deserializer<DynamicMessage> deserializer(DescriptorSchemaRegistry dsr) {
            return ProtobufDeserializers.of(this.specs.protocol(), this.descriptor);
        }

        @Override
        protected KafkaTools.KeyOrValueIngestData getIngestData(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
            ProtobufFunctions functions = ProtobufImpl.simple(this.descriptor, this.specs.parserOptions());
            ArrayList<FieldCopier> fieldCopiers = new ArrayList<FieldCopier>(functions.functions().size());
            KafkaTools.KeyOrValueIngestData data = new KafkaTools.KeyOrValueIngestData();
            data.fieldPathToColumnName = new LinkedHashMap<String, String>();
            ProtobufConsumeOptions.FieldPathToColumnName fieldPathToColumnName = this.specs.pathToColumnName();
            HashMap<FieldPath, Integer> indices = new HashMap<FieldPath, Integer>();
            for (ProtobufFunction f : functions.functions()) {
                int ix = indices.compute(f.path(), (fieldPath, i) -> i == null ? 0 : i + 1);
                ColumnName columnName = fieldPathToColumnName.columnName(f.path(), ix);
                this.add(columnName, (TypedFunction<Message>)f.function(), data, columnDefinitionsOut, fieldCopiers);
            }
            data.extra = fieldCopiers;
            return data;
        }

        private void add(ColumnName columnName, TypedFunction<Message> function, KafkaTools.KeyOrValueIngestData data, List<ColumnDefinition<?>> columnDefinitionsOut, List<FieldCopier> fieldCopiersOut) {
            data.fieldPathToColumnName.put(columnName.name(), columnName.name());
            columnDefinitionsOut.add(ColumnDefinition.of((String)columnName.name(), (Type)function.returnType()));
            fieldCopiersOut.add(FieldCopierAdapter.of((TypedFunction<Object>)PROTOBUF_MESSAGE_OBJ.map(ToChunkTypeTransform.of(function))));
        }

        @Override
        protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KafkaTools.KeyOrValueIngestData data) {
            return new KeyOrValueProcessorImpl(MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName), (List)data.extra, false);
        }
    }
}

