/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka.ingest;

import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.impl.util.unboxer.ChunkUnboxer;
import io.deephaven.kafka.StreamPublisherImpl;
import io.deephaven.kafka.ingest.ConsumerRecordToStreamPublisherAdapter;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.time.DateTimeUtils;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.jetbrains.annotations.NotNull;

public class KafkaStreamPublisher
implements ConsumerRecordToStreamPublisherAdapter {
    private final StreamPublisherImpl publisher;
    private final int kafkaPartitionColumnIndex;
    private final int offsetColumnIndex;
    private final int timestampColumnIndex;
    private final int simpleKeyColumnIndex;
    private final int simpleValueColumnIndex;
    private final boolean keyIsSimpleObject;
    private final boolean valueIsSimpleObject;
    final KeyOrValueProcessor keyProcessor;
    final KeyOrValueProcessor valueProcessor;
    final Function<Object, Object> keyToChunkObjectMapper;
    final Function<Object, Object> valueToChunkObjectMapper;

    private KafkaStreamPublisher(StreamPublisherImpl publisher, int kafkaPartitionColumnIndex, int offsetColumnIndex, int timestampColumnIndex, KeyOrValueProcessor keyProcessor, KeyOrValueProcessor valueProcessor, int simpleKeyColumnIndex, int simpleValueColumnIndex, Function<Object, Object> keyToChunkObjectMapper, Function<Object, Object> valueToChunkObjectMapper) {
        this.publisher = publisher;
        this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
        this.offsetColumnIndex = offsetColumnIndex;
        this.timestampColumnIndex = timestampColumnIndex;
        this.simpleKeyColumnIndex = simpleKeyColumnIndex;
        this.simpleValueColumnIndex = simpleValueColumnIndex;
        this.keyProcessor = keyProcessor;
        this.valueProcessor = valueProcessor;
        this.keyToChunkObjectMapper = keyToChunkObjectMapper;
        this.valueToChunkObjectMapper = valueToChunkObjectMapper;
        boolean bl = this.keyIsSimpleObject = this.simpleKeyColumnIndex >= 0;
        if (this.keyIsSimpleObject && keyProcessor != null) {
            throw new IllegalArgumentException("Simple Key Column Index can not be set when a keyProcessor is set");
        }
        boolean bl2 = this.valueIsSimpleObject = this.simpleValueColumnIndex >= 0;
        if (this.valueIsSimpleObject && valueProcessor != null) {
            throw new IllegalArgumentException("Simple Value Column Index can not be set when a valueProcessor is set");
        }
    }

    public static ConsumerRecordToStreamPublisherAdapter make(StreamPublisherImpl publisher, int kafkaPartitionColumnIndex, int offsetColumnIndex, int timestampColumnIndex, KeyOrValueProcessor keyProcessorArg, KeyOrValueProcessor valueProcessorArg, int simpleKeyColumnIndexArg, int simpleValueColumnIndexArg, Function<Object, Object> keyToChunkObjectMapper, Function<Object, Object> valueToChunkObjectMapper) {
        int simpleValueColumnIndex;
        KeyOrValueProcessor valueProcessor;
        int simpleKeyColumnIndex;
        KeyOrValueProcessor keyProcessor;
        if (keyProcessorArg != null && simpleKeyColumnIndexArg != -1) {
            throw new IllegalArgumentException("Either keyProcessor != null or simpleKeyColumnIndex != -1");
        }
        if (valueProcessorArg != null && simpleValueColumnIndexArg != -1) {
            throw new IllegalArgumentException("Either valueProcessor != null or simpleValueColumnIndex != -1");
        }
        if (simpleKeyColumnIndexArg == -1) {
            keyProcessor = keyProcessorArg;
            simpleKeyColumnIndex = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> keyPair = KafkaStreamPublisher.getProcessorAndSimpleIndex(simpleKeyColumnIndexArg, publisher.chunkType(simpleKeyColumnIndexArg));
            keyProcessor = (KeyOrValueProcessor)keyPair.first;
            simpleKeyColumnIndex = (Integer)keyPair.second;
        }
        if (simpleValueColumnIndexArg == -1) {
            valueProcessor = valueProcessorArg;
            simpleValueColumnIndex = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> valuePair = KafkaStreamPublisher.getProcessorAndSimpleIndex(simpleValueColumnIndexArg, publisher.chunkType(simpleValueColumnIndexArg));
            valueProcessor = (KeyOrValueProcessor)valuePair.first;
            simpleValueColumnIndex = (Integer)valuePair.second;
        }
        return new KafkaStreamPublisher(publisher, kafkaPartitionColumnIndex, offsetColumnIndex, timestampColumnIndex, keyProcessor, valueProcessor, simpleKeyColumnIndex, simpleValueColumnIndex, keyToChunkObjectMapper, valueToChunkObjectMapper);
    }

    @NotNull
    private static Pair<KeyOrValueProcessor, Integer> getProcessorAndSimpleIndex(int columnIndex, ChunkType chunkType) {
        int simpleIndex;
        SimpleKeyOrValueProcessor processor;
        boolean isSimpleObject;
        boolean bl = isSimpleObject = chunkType == ChunkType.Object;
        if (!isSimpleObject) {
            processor = new SimpleKeyOrValueProcessor(columnIndex, ChunkUnboxer.getEmptyUnboxer((ChunkType)chunkType));
            simpleIndex = -1;
        } else {
            processor = null;
            simpleIndex = columnIndex;
        }
        return new Pair((Object)processor, (Object)simpleIndex);
    }

    @Override
    public long consumeRecords(List<? extends ConsumerRecord<?, ?>> records) {
        return this.publisher.doLocked(() -> this.doConsumeRecords(records));
    }

    private boolean haveKey() {
        return !this.keyIsSimpleObject && this.keyProcessor != null;
    }

    private boolean haveValue() {
        return !this.valueIsSimpleObject && this.valueProcessor != null;
    }

    private long doConsumeRecords(List<? extends ConsumerRecord<?, ?>> records) {
        WritableChunk[] chunks = this.publisher.getChunks();
        this.checkChunkSizes(chunks);
        int remaining = chunks[0].capacity() - chunks[0].size();
        int chunkSize = Math.min(records.size(), chunks[0].capacity());
        long bytesProcessed = 0L;
        try (WritableObjectChunk keyChunkCloseable = this.haveKey() ? WritableObjectChunk.makeWritableChunk((int)chunkSize) : null;
             WritableObjectChunk valueChunkCloseable = this.haveValue() ? WritableObjectChunk.makeWritableChunk((int)chunkSize) : null;){
            Object valueChunk;
            Object keyChunk;
            if (keyChunkCloseable != null) {
                keyChunkCloseable.setSize(0);
                keyChunk = keyChunkCloseable;
            } else {
                keyChunk = this.keyIsSimpleObject ? chunks[this.simpleKeyColumnIndex].asWritableObjectChunk() : null;
            }
            if (valueChunkCloseable != null) {
                valueChunkCloseable.setSize(0);
                valueChunk = valueChunkCloseable;
            } else {
                valueChunk = this.valueIsSimpleObject ? chunks[this.simpleValueColumnIndex].asWritableObjectChunk() : null;
            }
            Object partitionChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
            Object offsetChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
            Object timestampChunk = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
            for (ConsumerRecord<?, ?> record : records) {
                if (--remaining == 0) {
                    if (keyChunk != null) {
                        this.flushKeyChunk((WritableObjectChunk<Object, Values>)keyChunk, chunks);
                    }
                    if (valueChunk != null) {
                        this.flushValueChunk((WritableObjectChunk<Object, Values>)valueChunk, chunks);
                    }
                    this.checkChunkSizes(chunks);
                    this.publisher.flush();
                    chunks = this.publisher.getChunks();
                    this.checkChunkSizes(chunks);
                    remaining = chunks[0].capacity() - chunks[0].size();
                    Assert.gtZero((int)remaining, (String)"remaining");
                    partitionChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
                    offsetChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
                    timestampChunk = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
                    if (this.keyIsSimpleObject) {
                        keyChunk = chunks[this.simpleKeyColumnIndex].asWritableObjectChunk();
                    }
                    if (this.valueIsSimpleObject) {
                        valueChunk = chunks[this.simpleValueColumnIndex].asWritableObjectChunk();
                    }
                }
                if (partitionChunk != null) {
                    partitionChunk.add(record.partition());
                }
                if (offsetChunk != null) {
                    offsetChunk.add(record.offset());
                }
                if (timestampChunk != null) {
                    long timestamp = record.timestamp();
                    if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
                        timestampChunk.add(Long.MIN_VALUE);
                    } else {
                        timestampChunk.add(DateTimeUtils.millisToNanos((long)timestamp));
                    }
                }
                if (keyChunk != null) {
                    keyChunk.add(this.keyToChunkObjectMapper.apply(record.key()));
                    int keyBytes = record.serializedKeySize();
                    if (keyBytes > 0) {
                        bytesProcessed += (long)keyBytes;
                    }
                }
                if (valueChunk == null) continue;
                valueChunk.add(this.valueToChunkObjectMapper.apply(record.value()));
                int valueBytes = record.serializedValueSize();
                if (valueBytes <= 0) continue;
                bytesProcessed += (long)valueBytes;
            }
            if (keyChunk != null) {
                this.flushKeyChunk((WritableObjectChunk<Object, Values>)keyChunk, chunks);
            }
            if (valueChunk != null) {
                this.flushValueChunk((WritableObjectChunk<Object, Values>)valueChunk, chunks);
            }
            this.checkChunkSizes(chunks);
        }
        return bytesProcessed;
    }

    private void checkChunkSizes(WritableChunk[] chunks) {
        for (int cc = 1; cc < chunks.length; ++cc) {
            if (chunks[cc].size() == chunks[0].size()) continue;
            throw new IllegalStateException("Publisher chunks have size mismatch: " + Arrays.stream(chunks).map(c -> Integer.toString(c.size())).collect(Collectors.joining(", ")));
        }
    }

    void flushKeyChunk(WritableObjectChunk<Object, Values> objectChunk, WritableChunk<Values>[] publisherChunks) {
        if (this.keyIsSimpleObject) {
            return;
        }
        this.keyProcessor.handleChunk((ObjectChunk<Object, Values>)objectChunk, publisherChunks);
        objectChunk.setSize(0);
    }

    void flushValueChunk(WritableObjectChunk<Object, Values> objectChunk, WritableChunk<Values>[] publisherChunks) {
        if (this.valueIsSimpleObject) {
            return;
        }
        this.valueProcessor.handleChunk((ObjectChunk<Object, Values>)objectChunk, publisherChunks);
        objectChunk.setSize(0);
    }

    static class SimpleKeyOrValueProcessor
    implements KeyOrValueProcessor {
        final int offset;
        final ChunkUnboxer.UnboxerKernel unboxer;

        SimpleKeyOrValueProcessor(int offset, ChunkUnboxer.UnboxerKernel unboxer) {
            this.offset = offset;
            this.unboxer = unboxer;
        }

        @Override
        public void handleChunk(ObjectChunk<Object, Values> inputChunk, WritableChunk<Values>[] publisherChunks) {
            WritableChunk<Values> publisherChunk = publisherChunks[this.offset];
            int existingSize = publisherChunk.size();
            publisherChunk.setSize(existingSize + inputChunk.size());
            this.unboxer.unboxTo(inputChunk, publisherChunk, 0, existingSize);
        }
    }
}

