package io.deephaven.kafka.publish;

import io.deephaven.api.ColumnName;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ReferentialIntegrity;
import java.time.Instant;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;

@InternalUseOnly
/* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka.class */
public class PublishToKafka<K, V> extends LivenessArtifact {
    public static final int CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault(PublishToKafka.class, "chunkSize", 2048);
    private final Table table;
    private final KafkaProducer<K, V> producer;
    private final String defaultTopic;
    private final Integer defaultPartition;
    private final KeyOrValueSerializer<K> keyChunkSerializer;
    private final KeyOrValueSerializer<V> valueChunkSerializer;
    private final ColumnSource<CharSequence> topicColumnSource;
    private final ColumnSource<Integer> partitionColumnSource;
    private final ColumnSource<Long> timestampColumnSource;

    @ReferentialIntegrity
    private final PublishToKafka<K, V>.PublishListener publishListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka$PublicationGuard.class */
    public class PublicationGuard implements Callback, SafeCloseable {
        private final AtomicLong sentCount = new AtomicLong();
        private final AtomicLong completedCount = new AtomicLong();
        private final AtomicReference<Exception> sendException = new AtomicReference<>();
        private volatile boolean closed;

        private PublicationGuard() {
        }

        private void reset() {
            this.sentCount.set(0L);
            this.completedCount.set(0L);
            this.sendException.set(null);
            this.closed = false;
        }

        private void onSend(long j) {
            if (this.closed) {
                throw new IllegalStateException("Tried to send using a guard that is no longer open");
            }
            this.sentCount.addAndGet(j);
        }

        public void onCompletion(@NotNull RecordMetadata recordMetadata, Exception exc) {
            this.completedCount.getAndIncrement();
            if (exc != null) {
                this.sendException.compareAndSet(null, exc);
            }
        }

        public void close() {
            this.closed = true;
            try {
                long j = this.sentCount.get();
                if (j == 0) {
                    return;
                }
                try {
                    PublishToKafka.this.producer.flush();
                    Exception exc = this.sendException.get();
                    if (exc != null) {
                        throw new KafkaPublisherException("KafkaProducer reported send failure", exc);
                    }
                    long j2 = this.completedCount.get();
                    if (j != j2) {
                        throw new KafkaPublisherException(String.format("Sent count %d does not match completed count %d", Long.valueOf(j), Long.valueOf(j2)));
                    }
                    reset();
                } catch (Exception e) {
                    throw new KafkaPublisherException("KafkaProducer reported flush failure", e);
                }
            } finally {
                reset();
            }
        }
    }

    /* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka$PublishListener.class */
    private class PublishListener extends InstrumentedTableUpdateListenerAdapter {
        private final ModifiedColumnSet keysModified;
        private final ModifiedColumnSet valuesModified;
        private final boolean isBlink;
        private final PublishToKafka<K, V>.PublicationGuard guard;

        private PublishListener(@NotNull ModifiedColumnSet modifiedColumnSet, @NotNull ModifiedColumnSet modifiedColumnSet2) {
            super("PublishToKafka", PublishToKafka.this.table, false);
            this.guard = new PublicationGuard();
            this.keysModified = modifiedColumnSet;
            this.valuesModified = modifiedColumnSet2;
            this.isBlink = BlinkTableTools.isBlink(PublishToKafka.this.table);
        }

        public void onUpdate(TableUpdate tableUpdate) {
            Assert.assertion(!this.keysModified.containsAny(tableUpdate.modifiedColumnSet()), "!keysModified.containsAny(upstream.modifiedColumnSet())", "Key columns should never be modified");
            PublishToKafka<K, V>.PublicationGuard publicationGuard = this.guard;
            try {
                if (this.isBlink) {
                    Assert.assertion(tableUpdate.modified().isEmpty(), "upstream.modified.empty()");
                    Assert.assertion(tableUpdate.shifted().empty(), "upstream.shifted.empty()");
                    PublishToKafka.this.publishMessages(tableUpdate.added(), false, true, this.guard);
                    if (publicationGuard != null) {
                        publicationGuard.close();
                        return;
                    }
                    return;
                }
                PublishToKafka.this.publishMessages(tableUpdate.removed(), true, false, this.guard);
                if (this.valuesModified.containsAny(tableUpdate.modifiedColumnSet())) {
                    RowSet union = tableUpdate.added().union(tableUpdate.modified());
                    try {
                        PublishToKafka.this.publishMessages(union, false, true, this.guard);
                        if (union != null) {
                            union.close();
                        }
                    } finally {
                    }
                } else {
                    PublishToKafka.this.publishMessages(tableUpdate.added(), false, true, this.guard);
                }
                if (publicationGuard != null) {
                    publicationGuard.close();
                }
            } catch (Throwable th) {
                if (publicationGuard != null) {
                    try {
                        publicationGuard.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Deprecated(forRemoval = true)
    public PublishToKafka(Properties properties, Table table, String str, String[] strArr, Serializer<K> serializer, KeyOrValueSerializer<K> keyOrValueSerializer, String[] strArr2, Serializer<V> serializer2, KeyOrValueSerializer<V> keyOrValueSerializer2, boolean z) {
        this(properties, table, str, null, strArr, serializer, keyOrValueSerializer, strArr2, serializer2, keyOrValueSerializer2, null, null, null, z);
    }

    public PublishToKafka(Properties properties, Table table, String str, Integer num, String[] strArr, Serializer<K> serializer, KeyOrValueSerializer<K> keyOrValueSerializer, String[] strArr2, Serializer<V> serializer2, KeyOrValueSerializer<V> keyOrValueSerializer2, ColumnName columnName, ColumnName columnName2, ColumnName columnName3, boolean z) {
        Table coalesce = table.coalesce();
        this.table = coalesce;
        this.producer = new KafkaProducer<>(properties, (Serializer) Objects.requireNonNull(serializer), (Serializer) Objects.requireNonNull(serializer2));
        this.defaultTopic = str;
        this.defaultPartition = num;
        this.keyChunkSerializer = keyOrValueSerializer;
        this.valueChunkSerializer = keyOrValueSerializer2;
        this.topicColumnSource = columnName == null ? null : coalesce.getColumnSource(columnName.name(), CharSequence.class);
        this.partitionColumnSource = columnName2 == null ? null : coalesce.getColumnSource(columnName2.name(), Integer.TYPE);
        this.timestampColumnSource = columnName3 == null ? null : ReinterpretUtils.instantToLongSource(coalesce.getColumnSource(columnName3.name(), Instant.class));
        if (z) {
            PublishToKafka<K, V>.PublicationGuard publicationGuard = new PublicationGuard();
            try {
                publishMessages(coalesce.getRowSet(), false, true, publicationGuard);
                publicationGuard.close();
            } catch (Throwable th) {
                try {
                    publicationGuard.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        if (!coalesce.isRefreshing()) {
            this.publishListener = null;
            this.producer.close();
        } else {
            PublishToKafka<K, V>.PublishListener publishListener = new PublishListener(getModifiedColumnSet(coalesce, strArr), getModifiedColumnSet(coalesce, strArr2));
            this.publishListener = publishListener;
            coalesce.addUpdateListener(publishListener);
            manage(this.publishListener);
        }
    }

    private static ModifiedColumnSet getModifiedColumnSet(@NotNull Table table, String[] strArr) {
        return strArr == null ? ModifiedColumnSet.EMPTY : ((QueryTable) table).newModifiedColumnSet(strArr);
    }

    private String topic(ObjectChunk<CharSequence, ?> objectChunk, int i) {
        CharSequence charSequence;
        if (objectChunk != null && (charSequence = (CharSequence) objectChunk.get(i)) != null) {
            return charSequence.toString();
        }
        return this.defaultTopic;
    }

    private Integer partition(IntChunk<?> intChunk, int i) {
        int i2;
        if (intChunk != null && (i2 = intChunk.get(i)) != Integer.MIN_VALUE) {
            return Integer.valueOf(i2);
        }
        return this.defaultPartition;
    }

    public static Long timestampMillis(LongChunk<?> longChunk, int i) {
        if (longChunk == null) {
            return null;
        }
        long j = longChunk.get(i);
        if (j == Long.MIN_VALUE) {
            return null;
        }
        return Long.valueOf(TimeUnit.NANOSECONDS.toMillis(j));
    }

    private static <T> T object(ObjectChunk<T, ?> objectChunk, int i) {
        if (objectChunk == null) {
            return null;
        }
        return (T) objectChunk.get(i);
    }

    private static ChunkSource.GetContext makeGetContext(ColumnSource<?> columnSource, int i) {
        if (columnSource == null) {
            return null;
        }
        return columnSource.makeGetContext(i);
    }

    /* JADX WARN: Removed duplicated region for block: B:24:0x008c A[Catch: Throwable -> 0x01c2, Throwable -> 0x01ee, Throwable -> 0x021a, Throwable -> 0x0246, Throwable -> 0x0272, Throwable -> 0x029e, TryCatch #1 {Throwable -> 0x021a, blocks: (B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201), top: B:17:0x006c }] */
    /* JADX WARN: Removed duplicated region for block: B:65:0x01b8 A[Catch: Throwable -> 0x01ee, Throwable -> 0x021a, Throwable -> 0x0246, Throwable -> 0x0272, Throwable -> 0x029e, TryCatch #1 {Throwable -> 0x021a, blocks: (B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201), top: B:17:0x006c }] */
    /* JADX WARN: Removed duplicated region for block: B:68:0x01e4 A[Catch: Throwable -> 0x021a, Throwable -> 0x0246, Throwable -> 0x0272, Throwable -> 0x029e, TryCatch #1 {Throwable -> 0x021a, blocks: (B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201), top: B:17:0x006c }] */
    /* JADX WARN: Removed duplicated region for block: B:71:0x0210 A[Catch: Throwable -> 0x0246, Throwable -> 0x0272, Throwable -> 0x029e, TryCatch #9 {Throwable -> 0x0272, blocks: (B:125:0x0049, B:127:0x0050, B:16:0x0061, B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:71:0x0210, B:74:0x023c, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201, B:110:0x0221, B:108:0x0236, B:113:0x022d, B:120:0x024d, B:118:0x0262, B:123:0x0259), top: B:124:0x0049, outer: #5 }] */
    /* JADX WARN: Removed duplicated region for block: B:74:0x023c A[Catch: Throwable -> 0x0272, Throwable -> 0x029e, TryCatch #9 {Throwable -> 0x0272, blocks: (B:125:0x0049, B:127:0x0050, B:16:0x0061, B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:71:0x0210, B:74:0x023c, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201, B:110:0x0221, B:108:0x0236, B:113:0x022d, B:120:0x024d, B:118:0x0262, B:123:0x0259), top: B:124:0x0049, outer: #5 }] */
    /* JADX WARN: Removed duplicated region for block: B:77:0x0268 A[Catch: Throwable -> 0x029e, TryCatch #5 {Throwable -> 0x029e, blocks: (B:7:0x002d, B:9:0x0034, B:125:0x0049, B:127:0x0050, B:16:0x0061, B:18:0x006c, B:20:0x0077, B:22:0x0082, B:24:0x008c, B:39:0x015b, B:42:0x0170, B:47:0x0139, B:48:0x0156, B:49:0x0149, B:52:0x0108, B:53:0x0125, B:54:0x0118, B:57:0x00d7, B:58:0x00f4, B:59:0x00e7, B:60:0x00ba, B:61:0x00a1, B:65:0x01b8, B:68:0x01e4, B:71:0x0210, B:74:0x023c, B:77:0x0268, B:90:0x01c9, B:88:0x01de, B:93:0x01d5, B:100:0x01f5, B:98:0x020a, B:103:0x0201, B:110:0x0221, B:108:0x0236, B:113:0x022d, B:120:0x024d, B:118:0x0262, B:123:0x0259, B:134:0x0279, B:132:0x028e, B:137:0x0285), top: B:6:0x002d, inners: #6, #9 }] */
    /* JADX WARN: Removed duplicated region for block: B:80:0x0294  */
    /* JADX WARN: Removed duplicated region for block: B:82:0x02bb A[ORIG_RETURN, RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void publishMessages(@org.jetbrains.annotations.NotNull io.deephaven.engine.rowset.RowSet r10, boolean r11, boolean r12, @org.jetbrains.annotations.NotNull io.deephaven.kafka.publish.PublishToKafka<K, V>.PublicationGuard r13) {
        /*
            Method dump skipped, instructions count: 700
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.deephaven.kafka.publish.PublishToKafka.publishMessages(io.deephaven.engine.rowset.RowSet, boolean, boolean, io.deephaven.kafka.publish.PublishToKafka$PublicationGuard):void");
    }

    protected void destroy() {
        super.destroy();
        this.producer.close();
    }
}
