package org.yamcs.yarch;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.yamcs.utils.TimeInterval;
import org.yamcs.yarch.streamsql.ColumnExpression;
import org.yamcs.yarch.streamsql.RelOp;
import org.yamcs.yarch.streamsql.StreamSqlException;

/* loaded from: input_file:org/yamcs/yarch/HistogramReaderStream.class */
public class HistogramReaderStream extends AbstractStream implements Runnable, DbReaderStream {
    private final ColumnSerializer<?> histoColumnSerializer;
    HistogramIterator iter;
    final TableDefinition tblDef;
    final String histoColumnName;
    TimeInterval interval;
    long mergeTime;
    static AtomicInteger count = new AtomicInteger(0);
    volatile boolean quit;
    private ColumnDefinition histoColumnDefinition;

    public HistogramReaderStream(YarchDatabaseInstance yarchDatabaseInstance, TableDefinition tableDefinition, String str, TupleDefinition tupleDefinition) throws YarchException {
        super(yarchDatabaseInstance, tableDefinition.getName() + "_histo_" + count.getAndIncrement(), tupleDefinition);
        this.interval = new TimeInterval();
        this.mergeTime = 2000L;
        this.quit = false;
        this.histoColumnSerializer = tableDefinition.getColumnSerializer(str);
        this.histoColumnDefinition = tableDefinition.getColumnDefinition(str);
        this.tblDef = tableDefinition;
        this.histoColumnName = str;
    }

    @Override // org.yamcs.yarch.AbstractStream, org.yamcs.yarch.Stream
    public void start() {
        new Thread(this, "HistogramReader[" + getName() + "]").start();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("starting a histogram stream for interval {}, mergeTime: {})", this.interval.toStringEncoded(), Long.valueOf(this.mergeTime));
        }
        try {
            this.iter = this.ydb.getStorageEngine(this.tblDef).getHistogramIterator(this.ydb, this.tblDef, this.histoColumnName, this.interval, this.mergeTime);
            while (!this.quit && this.iter.hasNext()) {
                emit(this.iter.next());
            }
        } catch (Exception e) {
            this.log.error("got exception ", e);
            e.printStackTrace();
        } finally {
            close();
        }
    }

    private void emit(HistogramRecord histogramRecord) throws IOException {
        emitTuple(new Tuple(getDefinition(), new Object[]{this.histoColumnSerializer.fromByteArray(histogramRecord.columnv, this.histoColumnDefinition), Long.valueOf(histogramRecord.start), Long.valueOf(histogramRecord.stop), Integer.valueOf(histogramRecord.num)}));
    }

    @Override // org.yamcs.yarch.DbReaderStream
    public boolean addRelOpFilter(ColumnExpression columnExpression, RelOp relOp, Object obj) throws StreamSqlException {
        String name = columnExpression.getName();
        if (!"first".equals(name) && !"last".equals(name)) {
            return false;
        }
        try {
            long longValue = ((Long) DataType.castAs(DataType.TIMESTAMP, obj)).longValue();
            switch (relOp) {
                case GREATER:
                case GREATER_OR_EQUAL:
                    this.interval.setStart(longValue);
                    return true;
                case LESS:
                case LESS_OR_EQUAL:
                    this.interval.setEnd(longValue);
                    return true;
                case EQUAL:
                    this.interval.setStart(longValue);
                    this.interval.setEnd(longValue);
                    return true;
                case NOT_EQUAL:
                default:
                    return false;
            }
        } catch (IllegalArgumentException e) {
            throw new StreamSqlException(StreamSqlException.ErrCode.ERROR, e.getMessage());
        }
    }

    @Override // org.yamcs.yarch.DbReaderStream
    public boolean addInFilter(ColumnExpression columnExpression, Set<Object> set) {
        return false;
    }

    public void setMergeTime(long j) {
        this.mergeTime = j;
    }

    @Override // org.yamcs.yarch.AbstractStream
    public void doClose() {
        this.iter.close();
        this.quit = true;
    }
}
