package io.deephaven.stream;

import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.NotificationStepSource;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/stream/TableStreamPublisherImpl.class */
class TableStreamPublisherImpl implements StreamPublisher {
    private final String name;
    private final TableDefinition definition;
    private final Runnable onFlushCallback;
    private final Runnable onShutdownCallback;
    private final int chunkSize;
    private StreamConsumer consumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/stream/TableStreamPublisherImpl$FillChunks.class */
    public class FillChunks implements ConstructSnapshot.SnapshotFunction {
        private final Table table;
        private final ColumnSource<?>[] sources;
        private final List<WritableChunk<Values>[]> outstandingChunks;

        public FillChunks(Table table) {
            this.table = (Table) Objects.requireNonNull(table);
            this.sources = new ColumnSource[TableStreamPublisherImpl.this.definition.numColumns()];
            int i = 0;
            for (ColumnDefinition columnDefinition : TableStreamPublisherImpl.this.definition.getColumns()) {
                int i2 = i;
                i++;
                this.sources[i2] = ReinterpretUtils.maybeConvertToPrimitive(table.getColumnSource(columnDefinition.getName(), columnDefinition.getDataType(), columnDefinition.getComponentType()));
            }
            this.outstandingChunks = new ArrayList();
        }

        @Override // io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction
        public boolean call(boolean z, long j) {
            ConstructSnapshot.State state = ConstructSnapshot.state();
            reset();
            RowSet prev = z ? this.table.getRowSet().prev() : this.table.getRowSet();
            long size = prev.size();
            int length = this.sources.length;
            ChunkSource.FillContext[] fillContextArr = new ChunkSource.FillContext[length];
            SharedContext makeSharedContext = length > 1 ? SharedContext.makeSharedContext() : null;
            try {
                SafeCloseableArray safeCloseableArray = new SafeCloseableArray(fillContextArr);
                try {
                    RowSequence.Iterator rowSequenceIterator = prev.getRowSequenceIterator();
                    try {
                        int min = (int) Math.min(TableStreamPublisherImpl.this.chunkSize, size);
                        for (int i = 0; i < length; i++) {
                            fillContextArr[i] = this.sources[i].makeFillContext(min, makeSharedContext);
                        }
                        long j2 = size;
                        while (rowSequenceIterator.hasMore()) {
                            if (makeSharedContext != null) {
                                makeSharedContext.reset();
                            }
                            assertTrue(state, j2 > 0, "remaining > 0");
                            RowSequence nextRowSequenceWithLength = rowSequenceIterator.getNextRowSequenceWithLength(TableStreamPublisherImpl.this.chunkSize);
                            int intSize = nextRowSequenceWithLength.intSize();
                            assertTrue(state, ((long) intSize) == Math.min((long) TableStreamPublisherImpl.this.chunkSize, j2), "rowSeqSize == Math.min(chunkSize, remaining)");
                            j2 -= intSize;
                            WritableChunk<Values>[] makeChunksForDefinition = StreamChunkUtils.makeChunksForDefinition(TableStreamPublisherImpl.this.definition, intSize);
                            this.outstandingChunks.add(makeChunksForDefinition);
                            for (int i2 = 0; i2 < length; i2++) {
                                if (z) {
                                    this.sources[i2].fillPrevChunk(fillContextArr[i2], makeChunksForDefinition[i2], nextRowSequenceWithLength);
                                } else {
                                    this.sources[i2].fillChunk(fillContextArr[i2], makeChunksForDefinition[i2], nextRowSequenceWithLength);
                                }
                                if (state.concurrentAttemptInconsistent()) {
                                    reset();
                                    if (rowSequenceIterator != null) {
                                        rowSequenceIterator.close();
                                    }
                                    safeCloseableArray.close();
                                    if (makeSharedContext != null) {
                                        makeSharedContext.close();
                                    }
                                    return false;
                                }
                            }
                        }
                        assertTrue(state, j2 == 0, "remaining == 0");
                        if (rowSequenceIterator != null) {
                            rowSequenceIterator.close();
                        }
                        safeCloseableArray.close();
                        if (makeSharedContext == null) {
                            return true;
                        }
                        makeSharedContext.close();
                        return true;
                    } catch (Throwable th) {
                        if (rowSequenceIterator != null) {
                            try {
                                rowSequenceIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (makeSharedContext != null) {
                    try {
                        makeSharedContext.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }

        private void assertTrue(ConstructSnapshot.State state, boolean z, String str) {
            if (z) {
                return;
            }
            reset();
            state.failIfConcurrentAttemptInconsistent();
            throw new Error("Found broken assertion not due to inconsistent attempt: " + str);
        }

        private void reset() {
            SafeCloseable.closeAll(this.outstandingChunks.stream().flatMap((v0) -> {
                return Stream.of(v0);
            }));
            this.outstandingChunks.clear();
        }
    }

    /* loaded from: input_file:io/deephaven/stream/TableStreamPublisherImpl$InputTableAdapter.class */
    private class InputTableAdapter implements InputTableUpdater {
        private InputTableAdapter() {
        }

        @Override // io.deephaven.engine.util.input.InputTableUpdater
        public TableDefinition getTableDefinition() {
            return TableStreamPublisherImpl.this.definition;
        }

        @Override // io.deephaven.engine.util.input.InputTableUpdater
        public void add(Table table) {
            TableStreamPublisherImpl.this.add(table);
        }

        @Override // io.deephaven.engine.util.input.InputTableUpdater
        public void addAsync(Table table, InputTableStatusListener inputTableStatusListener) {
            try {
                TableStreamPublisherImpl.this.add(table);
                inputTableStatusListener.onSuccess();
            } catch (Throwable th) {
                inputTableStatusListener.onError(th);
            }
        }

        @Override // io.deephaven.engine.util.input.InputTableUpdater
        public List<String> getKeyNames() {
            return Collections.emptyList();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TableStreamPublisherImpl(String str, TableDefinition tableDefinition, Runnable runnable, Runnable runnable2, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("chunkSize must be positive");
        }
        this.name = (String) Objects.requireNonNull(str);
        this.definition = (TableDefinition) Objects.requireNonNull(tableDefinition);
        this.onFlushCallback = runnable;
        this.onShutdownCallback = runnable2;
        this.chunkSize = i;
    }

    @Override // io.deephaven.stream.StreamPublisher
    public void register(@NotNull StreamConsumer streamConsumer) {
        if (this.consumer != null) {
            throw new IllegalStateException("Can not register multiple StreamConsumers.");
        }
        this.consumer = (StreamConsumer) Objects.requireNonNull(streamConsumer);
    }

    public TableDefinition definition() {
        return this.definition;
    }

    public void add(Table table) {
        FillChunks fillChunks = new FillChunks(table);
        SafeCloseable open = ExecutionContext.getContext().withUpdateGraph(table.getUpdateGraph()).open();
        try {
            ConstructSnapshot.callDataSnapshotFunction(TableStreamPublisherImpl.class.getSimpleName() + "-" + this.name, ConstructSnapshot.makeSnapshotControl(false, table.isRefreshing(), (NotificationStepSource) table), fillChunks);
            if (open != null) {
                open.close();
            }
            this.consumer.accept(fillChunks.outstandingChunks);
        } catch (Throwable th) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void publishFailure(Throwable th) {
        this.consumer.acceptFailure(th);
    }

    @Override // io.deephaven.stream.StreamPublisher
    public void flush() {
        if (this.onFlushCallback != null) {
            this.onFlushCallback.run();
        }
    }

    @Override // io.deephaven.stream.StreamPublisher
    public void shutdown() {
        if (this.onShutdownCallback != null) {
            this.onShutdownCallback.run();
        }
    }

    public InputTableUpdater inputTableUpdater() {
        return new InputTableAdapter();
    }
}
