package io.deephaven.stream;

import gnu.trove.list.array.TLongArrayList;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.sources.ByteAsBooleanColumnSource;
import io.deephaven.engine.table.impl.sources.LongAsDateTimeColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.SwitchColumnSource;
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.time.DateTime;
import io.deephaven.util.MultiException;
import io.deephaven.util.SafeCloseable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/stream/StreamToTableAdapter.class */
public class StreamToTableAdapter implements SafeCloseable, StreamConsumer, Runnable {
    private static final Logger log = LoggerFactory.getLogger(StreamToTableAdapter.class);
    private final TableDefinition tableDefinition;
    private final StreamPublisher streamPublisher;
    private final UpdateSourceRegistrar updateSourceRegistrar;
    private final String name;
    private final QueryTable table;
    private final TrackingWritableRowSet rowSet;
    private final SwitchColumnSource<?>[] switchSources;
    private final NullValueColumnSource<?>[] nullColumnSources;
    private ChunkColumnSource<?>[] bufferChunkSources;
    private ChunkColumnSource<?>[] currentChunkSources;
    private ChunkColumnSource<?>[] prevChunkSources;
    private List<Exception> enqueuedFailure;
    private volatile Runnable shutdownCallback;
    private volatile boolean alive = true;

    public StreamToTableAdapter(@NotNull TableDefinition tableDefinition, @NotNull StreamPublisher streamPublisher, @NotNull UpdateSourceRegistrar updateSourceRegistrar, @NotNull String str) {
        this.tableDefinition = tableDefinition;
        this.streamPublisher = streamPublisher;
        this.updateSourceRegistrar = updateSourceRegistrar;
        this.name = str;
        streamPublisher.register(this);
        log.info().append("Registering ").append(StreamToTableAdapter.class.getSimpleName()).append('-').append(str).endl();
        updateSourceRegistrar.addSource(this);
        this.nullColumnSources = makeNullColumnSources(tableDefinition);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.switchSources = makeSwitchSources(tableDefinition, this.nullColumnSources, linkedHashMap);
        this.rowSet = RowSetFactory.empty().toTracking();
        this.table = new QueryTable(this.rowSet, linkedHashMap) { // from class: io.deephaven.stream.StreamToTableAdapter.1
            {
                setFlat();
                setRefreshing(true);
                setAttribute("StreamTable", Boolean.TRUE);
                addParentReference(StreamToTableAdapter.this);
            }

            @Override // io.deephaven.engine.table.impl.BaseTable
            public void destroy() {
                StreamToTableAdapter.this.close();
            }
        };
    }

    public void setShutdownCallback(Runnable runnable) {
        this.shutdownCallback = runnable;
    }

    public WritableChunk[] makeChunksForDefinition(int i) {
        return makeChunksForDefinition(this.tableDefinition, i);
    }

    public ChunkType chunkTypeForIndex(int i) {
        return chunkTypeForColumn(this.tableDefinition.getColumns()[i]);
    }

    public static WritableChunk[] makeChunksForDefinition(TableDefinition tableDefinition, int i) {
        return (WritableChunk[]) tableDefinition.getColumnStream().map(columnDefinition -> {
            return makeChunk(columnDefinition, i);
        }).toArray(i2 -> {
            return new WritableChunk[i2];
        });
    }

    @NotNull
    private static ChunkColumnSource<?>[] makeChunkSources(TableDefinition tableDefinition) {
        TLongArrayList tLongArrayList = new TLongArrayList();
        return (ChunkColumnSource[]) tableDefinition.getColumnStream().map(columnDefinition -> {
            return makeChunkSourceForColumn(tLongArrayList, columnDefinition);
        }).toArray(i -> {
            return new ChunkColumnSource[i];
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotNull
    public static ChunkColumnSource<?> makeChunkSourceForColumn(TLongArrayList tLongArrayList, ColumnDefinition<?> columnDefinition) {
        Class<?> replacementType = replacementType(columnDefinition.getDataType());
        return replacementType != null ? ChunkColumnSource.make(ChunkType.fromElementType(replacementType), replacementType, null, tLongArrayList) : ChunkColumnSource.make(ChunkType.fromElementType(columnDefinition.getDataType()), columnDefinition.getDataType(), columnDefinition.getComponentType(), tLongArrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotNull
    public static WritableChunk<?> makeChunk(ColumnDefinition<?> columnDefinition, int i) {
        WritableChunk<?> makeWritableChunk = chunkTypeForColumn(columnDefinition).makeWritableChunk(i);
        makeWritableChunk.setSize(0);
        return makeWritableChunk;
    }

    private static ChunkType chunkTypeForColumn(ColumnDefinition<?> columnDefinition) {
        Class<?> replacementType = replacementType(columnDefinition.getDataType());
        return ChunkType.fromElementType(replacementType != null ? replacementType : columnDefinition.getDataType());
    }

    @NotNull
    private static NullValueColumnSource<?>[] makeNullColumnSources(TableDefinition tableDefinition) {
        return (NullValueColumnSource[]) tableDefinition.getColumnStream().map(StreamToTableAdapter::makeNullValueColumnSourceFromDefinition).toArray(i -> {
            return new NullValueColumnSource[i];
        });
    }

    private static NullValueColumnSource<?> makeNullValueColumnSourceFromDefinition(ColumnDefinition<?> columnDefinition) {
        Class<?> replacementType = replacementType(columnDefinition.getDataType());
        return replacementType != null ? NullValueColumnSource.getInstance(replacementType, null) : NullValueColumnSource.getInstance(columnDefinition.getDataType(), columnDefinition.getComponentType());
    }

    @NotNull
    private static SwitchColumnSource<?>[] makeSwitchSources(TableDefinition tableDefinition, NullValueColumnSource<?>[] nullValueColumnSourceArr, Map<String, ColumnSource<?>> map) {
        SwitchColumnSource<?>[] switchColumnSourceArr = new SwitchColumnSource[nullValueColumnSourceArr.length];
        ColumnDefinition[] columns = tableDefinition.getColumns();
        for (int i = 0; i < nullValueColumnSourceArr.length; i++) {
            SwitchColumnSource<?> switchColumnSource = new SwitchColumnSource<>(nullValueColumnSourceArr[i], StreamToTableAdapter::maybeClearChunkColumnSource);
            ColumnSource<?> longAsDateTimeColumnSource = columns[i].getDataType() == DateTime.class ? new LongAsDateTimeColumnSource(switchColumnSource) : columns[i].getDataType() == Boolean.class ? new ByteAsBooleanColumnSource(switchColumnSource) : switchColumnSource;
            switchColumnSourceArr[i] = switchColumnSource;
            map.put(columns[i].getName(), longAsDateTimeColumnSource);
        }
        return switchColumnSourceArr;
    }

    private static void maybeClearChunkColumnSource(ColumnSource<?> columnSource) {
        if (columnSource instanceof ChunkColumnSource) {
            ((ChunkColumnSource) columnSource).clear();
        }
    }

    private static Class<?> replacementType(Class<?> cls) {
        if (cls == DateTime.class) {
            return Long.TYPE;
        }
        if (cls == Boolean.class) {
            return Byte.TYPE;
        }
        return null;
    }

    public Table table() {
        return this.table;
    }

    public void close() {
        if (this.alive) {
            synchronized (this) {
                if (this.alive) {
                    this.alive = false;
                    log.info().append("Deregistering ").append(StreamToTableAdapter.class.getSimpleName()).append('-').append(this.name).endl();
                    this.updateSourceRegistrar.removeSource(this);
                    Runnable runnable = this.shutdownCallback;
                    if (runnable != null) {
                        runnable.run();
                    }
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            doRefresh();
        } catch (Exception e) {
            log.error().append("Error refreshing ").append(StreamToTableAdapter.class.getSimpleName()).append('-').append(this.name).append(": ").append(e).endl();
            this.table.notifyListenersOnError(e, null);
            this.updateSourceRegistrar.removeSource(this);
        }
    }

    private void doRefresh() {
        synchronized (this) {
            if (this.enqueuedFailure != null) {
                throw new UncheckedDeephavenException(MultiException.maybeWrapInMultiException("Multiple errors encountered while ingesting stream", (Exception[]) this.enqueuedFailure.toArray(new Exception[0])));
            }
        }
        this.streamPublisher.flush();
        long size = this.rowSet.size();
        synchronized (this) {
            long size2 = this.bufferChunkSources == null ? 0L : this.bufferChunkSources[0].getSize();
            if (size == 0 && size2 == 0) {
                return;
            }
            ChunkColumnSource<?>[] chunkColumnSourceArr = this.bufferChunkSources;
            this.bufferChunkSources = this.prevChunkSources;
            if (chunkColumnSourceArr == null) {
                for (int i = 0; i < this.switchSources.length; i++) {
                    this.switchSources[i].setNewCurrent(this.nullColumnSources[i]);
                }
            } else {
                for (int i2 = 0; i2 < this.switchSources.length; i2++) {
                    this.switchSources[i2].setNewCurrent(chunkColumnSourceArr[i2]);
                }
            }
            this.prevChunkSources = this.currentChunkSources;
            this.currentChunkSources = chunkColumnSourceArr;
            if (size < size2) {
                this.rowSet.insertRange(size, size2 - 1);
            } else if (size > size2) {
                this.rowSet.removeRange(size2, size - 1);
            }
            this.table.notifyListeners(new TableUpdateImpl(RowSetFactory.flat(size2), RowSetFactory.flat(size), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
        }
    }

    @Override // io.deephaven.stream.StreamConsumer
    @SafeVarargs
    public final void accept(@NotNull WritableChunk<Values>... writableChunkArr) {
        if (this.alive) {
            synchronized (this) {
                if (this.bufferChunkSources == null) {
                    this.bufferChunkSources = makeChunkSources(this.tableDefinition);
                }
                if (writableChunkArr.length != this.bufferChunkSources.length) {
                    throw new IllegalStateException("StreamConsumer data length = " + writableChunkArr.length + " chunks, expected " + this.bufferChunkSources.length);
                }
                for (int i = 0; i < writableChunkArr.length; i++) {
                    Assert.eq(writableChunkArr[0].size(), "data[0].size()", writableChunkArr[i].size(), "data[ii].size()");
                    this.bufferChunkSources[i].addChunk(writableChunkArr[i]);
                }
            }
        }
    }

    @Override // io.deephaven.stream.StreamFailureConsumer
    public void acceptFailure(@NotNull Exception exc) {
        if (this.alive) {
            synchronized (this) {
                if (this.enqueuedFailure == null) {
                    this.enqueuedFailure = new ArrayList();
                }
                this.enqueuedFailure.add(exc);
            }
            close();
        }
    }
}
