package io.deephaven.kafka.ingest;

import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.impl.util.unboxer.ChunkUnboxer;
import io.deephaven.kafka.StreamPublisherImpl;
import io.deephaven.time.DateTimeUtils;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/kafka/ingest/KafkaStreamPublisher.class */
public class KafkaStreamPublisher implements ConsumerRecordToStreamPublisherAdapter {
    private final StreamPublisherImpl publisher;
    private final int kafkaPartitionColumnIndex;
    private final int offsetColumnIndex;
    private final int timestampColumnIndex;
    private final int simpleKeyColumnIndex;
    private final int simpleValueColumnIndex;
    private final boolean keyIsSimpleObject;
    private final boolean valueIsSimpleObject;
    final KeyOrValueProcessor keyProcessor;
    final KeyOrValueProcessor valueProcessor;
    final Function<Object, Object> keyToChunkObjectMapper;
    final Function<Object, Object> valueToChunkObjectMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/deephaven/kafka/ingest/KafkaStreamPublisher$SimpleKeyOrValueProcessor.class */
    public static class SimpleKeyOrValueProcessor implements KeyOrValueProcessor {
        final int offset;
        final ChunkUnboxer.UnboxerKernel unboxer;

        SimpleKeyOrValueProcessor(int i, ChunkUnboxer.UnboxerKernel unboxerKernel) {
            this.offset = i;
            this.unboxer = unboxerKernel;
        }

        @Override // io.deephaven.kafka.ingest.KeyOrValueProcessor
        public void handleChunk(ObjectChunk<Object, Values> objectChunk, WritableChunk<Values>[] writableChunkArr) {
            WritableChunk<Values> writableChunk = writableChunkArr[this.offset];
            int size = writableChunk.size();
            writableChunk.setSize(size + objectChunk.size());
            this.unboxer.unboxTo(objectChunk, writableChunk, 0, size);
        }
    }

    private KafkaStreamPublisher(StreamPublisherImpl streamPublisherImpl, int i, int i2, int i3, KeyOrValueProcessor keyOrValueProcessor, KeyOrValueProcessor keyOrValueProcessor2, int i4, int i5, Function<Object, Object> function, Function<Object, Object> function2) {
        this.publisher = streamPublisherImpl;
        this.kafkaPartitionColumnIndex = i;
        this.offsetColumnIndex = i2;
        this.timestampColumnIndex = i3;
        this.simpleKeyColumnIndex = i4;
        this.simpleValueColumnIndex = i5;
        this.keyProcessor = keyOrValueProcessor;
        this.valueProcessor = keyOrValueProcessor2;
        this.keyToChunkObjectMapper = function;
        this.valueToChunkObjectMapper = function2;
        this.keyIsSimpleObject = this.simpleKeyColumnIndex >= 0;
        if (this.keyIsSimpleObject && keyOrValueProcessor != null) {
            throw new IllegalArgumentException("Simple Key Column Index can not be set when a keyProcessor is set");
        }
        this.valueIsSimpleObject = this.simpleValueColumnIndex >= 0;
        if (this.valueIsSimpleObject && keyOrValueProcessor2 != null) {
            throw new IllegalArgumentException("Simple Value Column Index can not be set when a valueProcessor is set");
        }
    }

    public static ConsumerRecordToStreamPublisherAdapter make(StreamPublisherImpl streamPublisherImpl, int i, int i2, int i3, KeyOrValueProcessor keyOrValueProcessor, KeyOrValueProcessor keyOrValueProcessor2, int i4, int i5, Function<Object, Object> function, Function<Object, Object> function2) {
        KeyOrValueProcessor keyOrValueProcessor3;
        int intValue;
        KeyOrValueProcessor keyOrValueProcessor4;
        int intValue2;
        if (keyOrValueProcessor != null && i4 != -1) {
            throw new IllegalArgumentException("Either keyProcessor != null or simpleKeyColumnIndex != -1");
        }
        if (keyOrValueProcessor2 != null && i5 != -1) {
            throw new IllegalArgumentException("Either valueProcessor != null or simpleValueColumnIndex != -1");
        }
        if (i4 == -1) {
            keyOrValueProcessor3 = keyOrValueProcessor;
            intValue = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> processorAndSimpleIndex = getProcessorAndSimpleIndex(i4, streamPublisherImpl.chunkType(i4));
            keyOrValueProcessor3 = (KeyOrValueProcessor) processorAndSimpleIndex.first;
            intValue = ((Integer) processorAndSimpleIndex.second).intValue();
        }
        if (i5 == -1) {
            keyOrValueProcessor4 = keyOrValueProcessor2;
            intValue2 = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> processorAndSimpleIndex2 = getProcessorAndSimpleIndex(i5, streamPublisherImpl.chunkType(i5));
            keyOrValueProcessor4 = (KeyOrValueProcessor) processorAndSimpleIndex2.first;
            intValue2 = ((Integer) processorAndSimpleIndex2.second).intValue();
        }
        return new KafkaStreamPublisher(streamPublisherImpl, i, i2, i3, keyOrValueProcessor3, keyOrValueProcessor4, intValue, intValue2, function, function2);
    }

    @NotNull
    private static Pair<KeyOrValueProcessor, Integer> getProcessorAndSimpleIndex(int i, ChunkType chunkType) {
        SimpleKeyOrValueProcessor simpleKeyOrValueProcessor;
        int i2;
        if (chunkType == ChunkType.Object) {
            simpleKeyOrValueProcessor = null;
            i2 = i;
        } else {
            simpleKeyOrValueProcessor = new SimpleKeyOrValueProcessor(i, ChunkUnboxer.getEmptyUnboxer(chunkType));
            i2 = -1;
        }
        return new Pair<>(simpleKeyOrValueProcessor, Integer.valueOf(i2));
    }

    @Override // io.deephaven.kafka.ingest.ConsumerRecordToStreamPublisherAdapter
    public void consumeRecords(List<? extends ConsumerRecord<?, ?>> list) {
        this.publisher.doLocked(() -> {
            doConsumeRecords(list);
        });
    }

    private boolean haveKey() {
        return (this.keyIsSimpleObject || this.keyProcessor == null) ? false : true;
    }

    private boolean haveValue() {
        return (this.valueIsSimpleObject || this.valueProcessor == null) ? false : true;
    }

    private void doConsumeRecords(List<? extends ConsumerRecord<?, ?>> list) {
        WritableObjectChunk<Object, Values> asWritableObjectChunk;
        WritableObjectChunk<Object, Values> asWritableObjectChunk2;
        WritableChunk[] chunks = this.publisher.getChunks();
        checkChunkSizes(chunks);
        int capacity = chunks[0].capacity() - chunks[0].size();
        int min = Math.min(list.size(), chunks[0].capacity());
        WritableObjectChunk<Object, Values> makeWritableChunk = haveKey() ? WritableObjectChunk.makeWritableChunk(min) : null;
        try {
            WritableObjectChunk<Object, Values> makeWritableChunk2 = haveValue() ? WritableObjectChunk.makeWritableChunk(min) : null;
            try {
                if (makeWritableChunk != null) {
                    makeWritableChunk.setSize(0);
                    asWritableObjectChunk = makeWritableChunk;
                } else {
                    asWritableObjectChunk = this.keyIsSimpleObject ? chunks[this.simpleKeyColumnIndex].asWritableObjectChunk() : null;
                }
                if (makeWritableChunk2 != null) {
                    makeWritableChunk2.setSize(0);
                    asWritableObjectChunk2 = makeWritableChunk2;
                } else {
                    asWritableObjectChunk2 = this.valueIsSimpleObject ? chunks[this.simpleValueColumnIndex].asWritableObjectChunk() : null;
                }
                WritableIntChunk asWritableIntChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
                WritableLongChunk asWritableLongChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
                WritableLongChunk asWritableLongChunk2 = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
                for (ConsumerRecord<?, ?> consumerRecord : list) {
                    capacity--;
                    if (capacity == 0) {
                        if (asWritableObjectChunk != null) {
                            flushKeyChunk(asWritableObjectChunk, chunks);
                        }
                        if (asWritableObjectChunk2 != null) {
                            flushValueChunk(asWritableObjectChunk2, chunks);
                        }
                        checkChunkSizes(chunks);
                        this.publisher.flush();
                        chunks = this.publisher.getChunks();
                        checkChunkSizes(chunks);
                        capacity = chunks[0].capacity() - chunks[0].size();
                        Assert.gtZero(capacity, "remaining");
                        asWritableIntChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
                        asWritableLongChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
                        asWritableLongChunk2 = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
                        if (this.keyIsSimpleObject) {
                            asWritableObjectChunk = chunks[this.simpleKeyColumnIndex].asWritableObjectChunk();
                        }
                        if (this.valueIsSimpleObject) {
                            asWritableObjectChunk2 = chunks[this.simpleValueColumnIndex].asWritableObjectChunk();
                        }
                    }
                    if (asWritableIntChunk != null) {
                        asWritableIntChunk.add(consumerRecord.partition());
                    }
                    if (asWritableLongChunk != null) {
                        asWritableLongChunk.add(consumerRecord.offset());
                    }
                    if (asWritableLongChunk2 != null) {
                        long timestamp = consumerRecord.timestamp();
                        if (consumerRecord.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
                            asWritableLongChunk2.add(Long.MIN_VALUE);
                        } else {
                            asWritableLongChunk2.add(DateTimeUtils.millisToNanos(timestamp));
                        }
                    }
                    if (asWritableObjectChunk != null) {
                        asWritableObjectChunk.add(this.keyToChunkObjectMapper.apply(consumerRecord.key()));
                    }
                    if (asWritableObjectChunk2 != null) {
                        asWritableObjectChunk2.add(this.valueToChunkObjectMapper.apply(consumerRecord.value()));
                    }
                }
                if (asWritableObjectChunk != null) {
                    flushKeyChunk(asWritableObjectChunk, chunks);
                }
                if (asWritableObjectChunk2 != null) {
                    flushValueChunk(asWritableObjectChunk2, chunks);
                }
                checkChunkSizes(chunks);
                if (makeWritableChunk2 != null) {
                    makeWritableChunk2.close();
                }
                if (makeWritableChunk != null) {
                    makeWritableChunk.close();
                }
            } catch (Throwable th) {
                if (makeWritableChunk2 != null) {
                    try {
                        makeWritableChunk2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (makeWritableChunk != null) {
                try {
                    makeWritableChunk.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void checkChunkSizes(WritableChunk[] writableChunkArr) {
        for (int i = 1; i < writableChunkArr.length; i++) {
            if (writableChunkArr[i].size() != writableChunkArr[0].size()) {
                throw new IllegalStateException("Publisher chunks have size mismatch: " + ((String) Arrays.stream(writableChunkArr).map(writableChunk -> {
                    return Integer.toString(writableChunk.size());
                }).collect(Collectors.joining(", "))));
            }
        }
    }

    void flushKeyChunk(WritableObjectChunk<Object, Values> writableObjectChunk, WritableChunk<Values>[] writableChunkArr) {
        if (this.keyIsSimpleObject) {
            return;
        }
        this.keyProcessor.handleChunk(writableObjectChunk, writableChunkArr);
        writableObjectChunk.setSize(0);
    }

    void flushValueChunk(WritableObjectChunk<Object, Values> writableObjectChunk, WritableChunk<Values>[] writableChunkArr) {
        if (this.valueIsSimpleObject) {
            return;
        }
        this.valueProcessor.handleChunk(writableObjectChunk, writableChunkArr);
        writableObjectChunk.setSize(0);
    }
}
