/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka.publish;

import io.deephaven.api.ColumnName;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.kafka.publish.KafkaPublisherException;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ReferentialIntegrity;
import java.time.Instant;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;

@InternalUseOnly
public class PublishToKafka<K, V>
extends LivenessArtifact {
    public static final int CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault(PublishToKafka.class, "chunkSize", 2048);
    private final Table table;
    private final KafkaProducer<K, V> producer;
    private final String defaultTopic;
    private final Integer defaultPartition;
    private final KeyOrValueSerializer<K> keyChunkSerializer;
    private final KeyOrValueSerializer<V> valueChunkSerializer;
    private final ColumnSource<CharSequence> topicColumnSource;
    private final ColumnSource<Integer> partitionColumnSource;
    private final ColumnSource<Long> timestampColumnSource;
    @ReferentialIntegrity
    private final PublishListener publishListener;

    @Deprecated(forRemoval=true)
    public PublishToKafka(Properties props, Table table, String topic, String[] keyColumns, Serializer<K> kafkaKeySerializer, KeyOrValueSerializer<K> keyChunkSerializer, String[] valueColumns, Serializer<V> kafkaValueSerializer, KeyOrValueSerializer<V> valueChunkSerializer, boolean publishInitial) {
        this(props, table, topic, null, keyColumns, kafkaKeySerializer, keyChunkSerializer, valueColumns, kafkaValueSerializer, valueChunkSerializer, null, null, null, publishInitial);
    }

    public PublishToKafka(Properties props, Table table, String defaultTopic, Integer defaultPartition, String[] keyColumns, Serializer<K> kafkaKeySerializer, KeyOrValueSerializer<K> keyChunkSerializer, String[] valueColumns, Serializer<V> kafkaValueSerializer, KeyOrValueSerializer<V> valueChunkSerializer, ColumnName topicColumn, ColumnName partitionColumn, ColumnName timestampColumn, boolean publishInitial) {
        this.table = table = table.coalesce();
        this.producer = new KafkaProducer(props, Objects.requireNonNull(kafkaKeySerializer), Objects.requireNonNull(kafkaValueSerializer));
        this.defaultTopic = defaultTopic;
        this.defaultPartition = defaultPartition;
        this.keyChunkSerializer = keyChunkSerializer;
        this.valueChunkSerializer = valueChunkSerializer;
        this.topicColumnSource = topicColumn == null ? null : table.getColumnSource(topicColumn.name(), CharSequence.class);
        this.partitionColumnSource = partitionColumn == null ? null : table.getColumnSource(partitionColumn.name(), Integer.TYPE);
        ColumnSource<Long> columnSource = this.timestampColumnSource = timestampColumn == null ? null : ReinterpretUtils.instantToLongSource((ColumnSource)table.getColumnSource(timestampColumn.name(), Instant.class));
        if (publishInitial) {
            try (PublicationGuard guard = new PublicationGuard();){
                this.publishMessages((RowSet)table.getRowSet(), false, true, guard);
            }
        }
        if (table.isRefreshing()) {
            this.publishListener = new PublishListener(PublishToKafka.getModifiedColumnSet(table, keyColumns), PublishToKafka.getModifiedColumnSet(table, valueColumns));
            table.addUpdateListener((TableUpdateListener)this.publishListener);
            this.manage((LivenessReferent)this.publishListener);
        } else {
            this.publishListener = null;
            this.producer.close();
        }
    }

    private static ModifiedColumnSet getModifiedColumnSet(@NotNull Table table, String[] columns) {
        return columns == null ? ModifiedColumnSet.EMPTY : ((QueryTable)table).newModifiedColumnSet(columns);
    }

    private String topic(ObjectChunk<CharSequence, ?> topicChunk, int index) {
        if (topicChunk == null) {
            return this.defaultTopic;
        }
        CharSequence charSequence = (CharSequence)topicChunk.get(index);
        return charSequence == null ? this.defaultTopic : charSequence.toString();
    }

    private Integer partition(IntChunk<?> partitionChunk, int index) {
        if (partitionChunk == null) {
            return this.defaultPartition;
        }
        int partition = partitionChunk.get(index);
        return partition == Integer.MIN_VALUE ? this.defaultPartition : Integer.valueOf(partition);
    }

    public static Long timestampMillis(LongChunk<?> nanosChunk, int index) {
        if (nanosChunk == null) {
            return null;
        }
        long nanos = nanosChunk.get(index);
        return nanos == Long.MIN_VALUE ? null : Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanos));
    }

    private static <T> T object(ObjectChunk<T, ?> chunk, int index) {
        return (T)(chunk == null ? null : chunk.get(index));
    }

    private static ChunkSource.GetContext makeGetContext(ColumnSource<?> source, int chunkSize) {
        return source == null ? null : source.makeGetContext(chunkSize);
    }

    private void publishMessages(@NotNull RowSet rowsToPublish, boolean usePrevious, boolean publishValues, @NotNull PublicationGuard guard) {
        if (rowsToPublish.isEmpty()) {
            return;
        }
        guard.onSend(rowsToPublish.size());
        int chunkSize = (int)Math.min((long)CHUNK_SIZE, rowsToPublish.size());
        try (RowSequence.Iterator rowsIterator = rowsToPublish.getRowSequenceIterator();
             KeyOrValueSerializer.Context keyContext = this.keyChunkSerializer != null ? this.keyChunkSerializer.makeContext(chunkSize) : null;
             KeyOrValueSerializer.Context valueContext = publishValues && this.valueChunkSerializer != null ? this.valueChunkSerializer.makeContext(chunkSize) : null;
             ChunkSource.GetContext topicContext = PublishToKafka.makeGetContext(this.topicColumnSource, chunkSize);
             ChunkSource.GetContext partitionContext = PublishToKafka.makeGetContext(this.partitionColumnSource, chunkSize);
             ChunkSource.GetContext timestampContext = PublishToKafka.makeGetContext(this.timestampColumnSource, chunkSize);){
            while (rowsIterator.hasMore()) {
                IntChunk partitionChunk;
                ObjectChunk topicChunk;
                ObjectChunk<V, Values> valueChunk;
                RowSequence chunkRowKeys = rowsIterator.getNextRowSequenceWithLength((long)chunkSize);
                ObjectChunk<K, Values> keyChunk = keyContext == null ? null : this.keyChunkSerializer.handleChunk(keyContext, chunkRowKeys, usePrevious);
                ObjectChunk<V, Values> objectChunk = valueChunk = valueContext == null ? null : this.valueChunkSerializer.handleChunk(valueContext, chunkRowKeys, usePrevious);
                ObjectChunk objectChunk2 = topicContext == null ? null : (topicChunk = (usePrevious ? this.topicColumnSource.getPrevChunk(topicContext, chunkRowKeys) : this.topicColumnSource.getChunk(topicContext, chunkRowKeys)).asObjectChunk());
                IntChunk intChunk = partitionContext == null ? null : (partitionChunk = (usePrevious ? this.partitionColumnSource.getPrevChunk(partitionContext, chunkRowKeys) : this.partitionColumnSource.getChunk(partitionContext, chunkRowKeys)).asIntChunk());
                LongChunk timestampChunk = timestampContext == null ? null : (usePrevious ? this.timestampColumnSource.getPrevChunk(timestampContext, chunkRowKeys) : this.timestampColumnSource.getChunk(timestampContext, chunkRowKeys)).asLongChunk();
                int numRecords = chunkRowKeys.intSize();
                for (int ii = 0; ii < numRecords; ++ii) {
                    ProducerRecord record = new ProducerRecord(this.topic(topicChunk, ii), this.partition(partitionChunk, ii), PublishToKafka.timestampMillis(timestampChunk, ii), PublishToKafka.object(keyChunk, ii), PublishToKafka.object(valueChunk, ii));
                    this.producer.send(record, (Callback)guard);
                }
            }
        }
    }

    protected void destroy() {
        super.destroy();
        this.producer.close();
    }

    private class PublishListener
    extends InstrumentedTableUpdateListenerAdapter {
        private final ModifiedColumnSet keysModified;
        private final ModifiedColumnSet valuesModified;
        private final boolean isBlink;
        private final PublicationGuard guard;

        private PublishListener(@NotNull ModifiedColumnSet keysModified, ModifiedColumnSet valuesModified) {
            super("PublishToKafka", PublishToKafka.this.table, false);
            this.guard = new PublicationGuard();
            this.keysModified = keysModified;
            this.valuesModified = valuesModified;
            this.isBlink = BlinkTableTools.isBlink((Table)PublishToKafka.this.table);
        }

        public void onUpdate(TableUpdate upstream) {
            block15: {
                Assert.assertion((!this.keysModified.containsAny(upstream.modifiedColumnSet()) ? 1 : 0) != 0, (String)"!keysModified.containsAny(upstream.modifiedColumnSet())", (String)"Key columns should never be modified");
                try (PublicationGuard ignored = this.guard;){
                    if (this.isBlink) {
                        Assert.assertion((boolean)upstream.modified().isEmpty(), (String)"upstream.modified.empty()");
                        Assert.assertion((boolean)upstream.shifted().empty(), (String)"upstream.shifted.empty()");
                        PublishToKafka.this.publishMessages(upstream.added(), false, true, this.guard);
                        return;
                    }
                    PublishToKafka.this.publishMessages(upstream.removed(), true, false, this.guard);
                    if (this.valuesModified.containsAny(upstream.modifiedColumnSet())) {
                        try (WritableRowSet addedAndModified = upstream.added().union(upstream.modified());){
                            PublishToKafka.this.publishMessages((RowSet)addedAndModified, false, true, this.guard);
                            break block15;
                        }
                    }
                    PublishToKafka.this.publishMessages(upstream.added(), false, true, this.guard);
                }
            }
        }
    }

    private class PublicationGuard
    implements Callback,
    SafeCloseable {
        private final AtomicLong sentCount = new AtomicLong();
        private final AtomicLong completedCount = new AtomicLong();
        private final AtomicReference<Exception> sendException = new AtomicReference();
        private volatile boolean closed;

        private PublicationGuard() {
        }

        private void reset() {
            this.sentCount.set(0L);
            this.completedCount.set(0L);
            this.sendException.set(null);
            this.closed = false;
        }

        private void onSend(long messagesToSend) {
            if (this.closed) {
                throw new IllegalStateException("Tried to send using a guard that is no longer open");
            }
            this.sentCount.addAndGet(messagesToSend);
        }

        public void onCompletion(@NotNull RecordMetadata metadata, Exception exception) {
            this.completedCount.getAndIncrement();
            if (exception != null) {
                this.sendException.compareAndSet(null, exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            this.closed = true;
            try {
                long localSentCount = this.sentCount.get();
                if (localSentCount == 0L) {
                    return;
                }
                try {
                    PublishToKafka.this.producer.flush();
                }
                catch (Exception e) {
                    throw new KafkaPublisherException("KafkaProducer reported flush failure", e);
                }
                Exception localSendException = this.sendException.get();
                if (localSendException != null) {
                    throw new KafkaPublisherException("KafkaProducer reported send failure", localSendException);
                }
                long localCompletedCount = this.completedCount.get();
                if (localSentCount != localCompletedCount) {
                    throw new KafkaPublisherException(String.format("Sent count %d does not match completed count %d", localSentCount, localCompletedCount));
                }
            }
            finally {
                this.reset();
            }
        }
    }
}

