package io.deephaven.kafka.publish;

import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ReferentialIntegrity;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;

@InternalUseOnly
/* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka.class */
public class PublishToKafka<K, V> extends LivenessArtifact {
    public static final int CHUNK_SIZE = Configuration.getInstance().getIntegerForClassWithDefault(PublishToKafka.class, "chunkSize", 2048);
    private final Table table;
    private final KafkaProducer<K, V> producer;
    private final String topic;
    private final KeyOrValueSerializer<K> keyChunkSerializer;
    private final KeyOrValueSerializer<V> valueChunkSerializer;

    @ReferentialIntegrity
    private final PublishToKafka<K, V>.PublishListener publishListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka$PublicationGuard.class */
    public class PublicationGuard implements Callback, SafeCloseable {
        private final AtomicLong sentCount = new AtomicLong();
        private final AtomicLong completedCount = new AtomicLong();
        private final AtomicReference<Exception> sendException = new AtomicReference<>();
        private volatile boolean closed;

        private PublicationGuard() {
        }

        private void reset() {
            this.sentCount.set(0L);
            this.completedCount.set(0L);
            this.sendException.set(null);
            this.closed = false;
        }

        private void onSend(long j) {
            if (this.closed) {
                throw new IllegalStateException("Tried to send using a guard that is no longer open");
            }
            this.sentCount.addAndGet(j);
        }

        public void onCompletion(@NotNull RecordMetadata recordMetadata, Exception exc) {
            this.completedCount.getAndIncrement();
            if (exc != null) {
                this.sendException.compareAndSet(null, exc);
            }
        }

        public void close() {
            this.closed = true;
            try {
                long j = this.sentCount.get();
                if (j == 0) {
                    return;
                }
                try {
                    PublishToKafka.this.producer.flush();
                    Exception exc = this.sendException.get();
                    if (exc != null) {
                        throw new KafkaPublisherException("KafkaProducer reported send failure", exc);
                    }
                    long j2 = this.completedCount.get();
                    if (j != j2) {
                        throw new KafkaPublisherException(String.format("Sent count %d does not match completed count %d", Long.valueOf(j), Long.valueOf(j2)));
                    }
                    reset();
                } catch (Exception e) {
                    throw new KafkaPublisherException("KafkaProducer reported flush failure", e);
                }
            } finally {
                reset();
            }
        }
    }

    /* loaded from: input_file:io/deephaven/kafka/publish/PublishToKafka$PublishListener.class */
    private class PublishListener extends InstrumentedTableUpdateListenerAdapter {
        private final ModifiedColumnSet keysModified;
        private final ModifiedColumnSet valuesModified;
        private final boolean isBlink;
        private final PublishToKafka<K, V>.PublicationGuard guard;

        private PublishListener(@NotNull ModifiedColumnSet modifiedColumnSet, @NotNull ModifiedColumnSet modifiedColumnSet2) {
            super("PublishToKafka", PublishToKafka.this.table, false);
            this.guard = new PublicationGuard();
            this.keysModified = modifiedColumnSet;
            this.valuesModified = modifiedColumnSet2;
            this.isBlink = BlinkTableTools.isBlink(PublishToKafka.this.table);
        }

        public void onUpdate(TableUpdate tableUpdate) {
            Assert.assertion(!this.keysModified.containsAny(tableUpdate.modifiedColumnSet()), "!keysModified.containsAny(upstream.modifiedColumnSet())", "Key columns should never be modified");
            PublishToKafka<K, V>.PublicationGuard publicationGuard = this.guard;
            try {
                if (this.isBlink) {
                    Assert.assertion(tableUpdate.modified().isEmpty(), "upstream.modified.empty()");
                    Assert.assertion(tableUpdate.shifted().empty(), "upstream.shifted.empty()");
                    PublishToKafka.this.publishMessages(tableUpdate.added(), false, true, this.guard);
                    if (publicationGuard != null) {
                        publicationGuard.close();
                        return;
                    }
                    return;
                }
                PublishToKafka.this.publishMessages(tableUpdate.removed(), true, false, this.guard);
                if (this.valuesModified.containsAny(tableUpdate.modifiedColumnSet())) {
                    RowSet union = tableUpdate.added().union(tableUpdate.modified());
                    try {
                        PublishToKafka.this.publishMessages(union, false, true, this.guard);
                        if (union != null) {
                            union.close();
                        }
                    } finally {
                    }
                } else {
                    PublishToKafka.this.publishMessages(tableUpdate.added(), false, true, this.guard);
                }
                if (publicationGuard != null) {
                    publicationGuard.close();
                }
            } catch (Throwable th) {
                if (publicationGuard != null) {
                    try {
                        publicationGuard.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public PublishToKafka(Properties properties, Table table, String str, String[] strArr, Serializer<K> serializer, KeyOrValueSerializer<K> keyOrValueSerializer, String[] strArr2, Serializer<V> serializer2, KeyOrValueSerializer<V> keyOrValueSerializer2, boolean z) {
        this.table = table;
        this.producer = new KafkaProducer<>(properties, (Serializer) Objects.requireNonNull(serializer), (Serializer) Objects.requireNonNull(serializer2));
        this.topic = str;
        this.keyChunkSerializer = keyOrValueSerializer;
        this.valueChunkSerializer = keyOrValueSerializer2;
        if (z) {
            PublishToKafka<K, V>.PublicationGuard publicationGuard = new PublicationGuard();
            try {
                publishMessages(table.getRowSet(), false, true, publicationGuard);
                publicationGuard.close();
            } catch (Throwable th) {
                try {
                    publicationGuard.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        if (!table.isRefreshing()) {
            this.publishListener = null;
            this.producer.close();
        } else {
            PublishToKafka<K, V>.PublishListener publishListener = new PublishListener(getModifiedColumnSet(table, strArr), getModifiedColumnSet(table, strArr2));
            this.publishListener = publishListener;
            table.addUpdateListener(publishListener);
            manage(this.publishListener);
        }
    }

    private static ModifiedColumnSet getModifiedColumnSet(@NotNull Table table, String[] strArr) {
        return strArr == null ? ModifiedColumnSet.EMPTY : ((QueryTable) table).newModifiedColumnSet(strArr);
    }

    /* JADX WARN: Removed duplicated region for block: B:18:0x006b A[Catch: Throwable -> 0x010d, Throwable -> 0x0139, Throwable -> 0x0165, TryCatch #1 {Throwable -> 0x010d, blocks: (B:16:0x0061, B:18:0x006b, B:20:0x007c, B:23:0x0097, B:25:0x00b0, B:27:0x00bc, B:29:0x00c9, B:32:0x00d9, B:34:0x00e4), top: B:15:0x0061 }] */
    /* JADX WARN: Removed duplicated region for block: B:45:0x0103 A[Catch: Throwable -> 0x0139, Throwable -> 0x0165, TryCatch #5 {Throwable -> 0x0139, blocks: (B:66:0x0049, B:68:0x0050, B:16:0x0061, B:18:0x006b, B:20:0x007c, B:23:0x0097, B:25:0x00b0, B:27:0x00bc, B:29:0x00c9, B:32:0x00d9, B:34:0x00e4, B:45:0x0103, B:61:0x0114, B:59:0x0129, B:64:0x0120), top: B:65:0x0049, outer: #3 }] */
    /* JADX WARN: Removed duplicated region for block: B:48:0x012f A[Catch: Throwable -> 0x0165, TryCatch #3 {Throwable -> 0x0165, blocks: (B:7:0x002d, B:9:0x0034, B:66:0x0049, B:68:0x0050, B:16:0x0061, B:18:0x006b, B:20:0x007c, B:23:0x0097, B:25:0x00b0, B:27:0x00bc, B:29:0x00c9, B:32:0x00d9, B:34:0x00e4, B:45:0x0103, B:48:0x012f, B:61:0x0114, B:59:0x0129, B:64:0x0120, B:75:0x0140, B:73:0x0155, B:78:0x014c), top: B:6:0x002d, inners: #0, #5 }] */
    /* JADX WARN: Removed duplicated region for block: B:51:0x015b  */
    /* JADX WARN: Removed duplicated region for block: B:53:0x0182 A[ORIG_RETURN, RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void publishMessages(@org.jetbrains.annotations.NotNull io.deephaven.engine.rowset.RowSet r8, boolean r9, boolean r10, @org.jetbrains.annotations.NotNull io.deephaven.kafka.publish.PublishToKafka<K, V>.PublicationGuard r11) {
        /*
            Method dump skipped, instructions count: 387
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.deephaven.kafka.publish.PublishToKafka.publishMessages(io.deephaven.engine.rowset.RowSet, boolean, boolean, io.deephaven.kafka.publish.PublishToKafka$PublicationGuard):void");
    }

    protected void destroy() {
        super.destroy();
        this.producer.close();
    }
}
