/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.ingest.MultiFieldChunkAdapter;
import io.deephaven.processor.NamedObjectProcessor;
import io.deephaven.qst.type.Type;
import io.deephaven.util.mutable.MutableInt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;

class KeyOrValueSpecObjectProcessorImpl<T>
extends KafkaTools.Consume.KeyOrValueSpec {
    private final Deserializer<? extends T> deserializer;
    private final NamedObjectProcessor<? super T> processor;

    KeyOrValueSpecObjectProcessorImpl(Deserializer<? extends T> deserializer, NamedObjectProcessor<? super T> processor) {
        this.deserializer = Objects.requireNonNull(deserializer);
        this.processor = Objects.requireNonNull(processor);
    }

    @Override
    public Optional<SchemaProvider> getSchemaProvider() {
        return Optional.empty();
    }

    protected Deserializer<? extends T> getDeserializer(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs) {
        return this.deserializer;
    }

    @Override
    protected KafkaTools.KeyOrValueIngestData getIngestData(KafkaTools.KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
        KafkaTools.KeyOrValueIngestData data = new KafkaTools.KeyOrValueIngestData();
        data.fieldPathToColumnName = new LinkedHashMap<String, String>();
        List names = this.processor.names();
        List types = this.processor.processor().outputTypes();
        int L = names.size();
        for (int i = 0; i < L; ++i) {
            String columnName = (String)names.get(i);
            Type type = (Type)types.get(i);
            data.fieldPathToColumnName.put(columnName, columnName);
            columnDefinitionsOut.add(ColumnDefinition.of((String)columnName, (Type)type));
        }
        return data;
    }

    @Override
    protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KafkaTools.KeyOrValueIngestData data) {
        return new KeyOrValueProcessorImpl(KeyOrValueSpecObjectProcessorImpl.offsetsFunction(MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName)));
    }

    private static <T> Function<T[], List<T>> offsetsFunction(int[] offsets) {
        return offsets.length == 0 ? array -> Collections.emptyList() : (KeyOrValueSpecObjectProcessorImpl.isInOrder(offsets) ? array -> Arrays.asList(array).subList(offsets[0], offsets[0] + offsets.length) : array -> KeyOrValueSpecObjectProcessorImpl.reorder(array, offsets));
    }

    private static boolean isInOrder(int[] offsets) {
        for (int i = 1; i < offsets.length; ++i) {
            if (offsets[i - 1] + 1 == offsets[i]) continue;
            return false;
        }
        return true;
    }

    private static <T> List<T> reorder(T[] array, int[] offsets) {
        ArrayList<T> out = new ArrayList<T>(offsets.length);
        for (int offset : offsets) {
            out.add(array[offset]);
        }
        return out;
    }

    private class KeyOrValueProcessorImpl
    implements KeyOrValueProcessor {
        private final Function<WritableChunk<?>[], List<WritableChunk<?>>> offsetsAdapter;

        private KeyOrValueProcessorImpl(Function<WritableChunk<?>[], List<WritableChunk<?>>> offsetsAdapter) {
            this.offsetsAdapter = Objects.requireNonNull(offsetsAdapter);
        }

        @Override
        public void handleChunk(ObjectChunk<Object, Values> inputChunk, WritableChunk<Values>[] publisherChunks) {
            ObjectChunk<Object, Values> in = inputChunk;
            KeyOrValueSpecObjectProcessorImpl.this.processor.processor().processAll(in, this.offsetsAdapter.apply(publisherChunks));
        }
    }
}

