package org.yamcs.yarch.streamsql;

import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.yamcs.logging.Log;
import org.yamcs.yarch.ExecutionContext;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabaseInstance;

/* loaded from: input_file:org/yamcs/yarch/streamsql/SelectTableStatement.class */
public class SelectTableStatement implements StreamSqlStatement {
    private SelectExpression expression;
    static final Tuple END_SIGNAL = new Tuple(new TupleDefinition(), (List<Object>) Arrays.asList(new Object[0]));

    /* loaded from: input_file:org/yamcs/yarch/streamsql/SelectTableStatement$QueueStreamSqlResult.class */
    static class QueueStreamSqlResult implements StreamSqlResult, StreamSubscriber {
        final Stream stream;
        final ExecutionContext context;
        BlockingQueue<Tuple> queue = new ArrayBlockingQueue(1024);
        Tuple next;
        static Log log = new Log(QueueStreamSqlResult.class);

        QueueStreamSqlResult(ExecutionContext executionContext, Stream stream) {
            this.stream = stream;
            this.context = executionContext;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.next == null) {
                this.next = queueTake();
            }
            return this.next != SelectTableStatement.END_SIGNAL;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Tuple next() {
            if (this.next == null) {
                this.next = queueTake();
            }
            if (this.next == SelectTableStatement.END_SIGNAL) {
                throw new NoSuchElementException();
            }
            Tuple tuple = this.next;
            this.next = null;
            return tuple;
        }

        private Tuple queueTake() {
            try {
                return this.queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return SelectTableStatement.END_SIGNAL;
            }
        }

        @Override // org.yamcs.yarch.streamsql.StreamSqlResult
        public void close() {
            this.stream.close();
            this.context.close();
            this.queue.add(SelectTableStatement.END_SIGNAL);
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void onTuple(Stream stream, Tuple tuple) {
            this.queue.add(tuple);
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void streamClosed(Stream stream) {
            this.queue.add(SelectTableStatement.END_SIGNAL);
        }

        protected void finalize() {
            if (this.stream.isClosed()) {
                return;
            }
            log.error("Stream {} left dangling (StreamSqlResult has been discarded before closing)", this.stream.getName());
            close();
        }
    }

    public SelectTableStatement(SelectExpression selectExpression) {
        this.expression = selectExpression;
    }

    @Override // org.yamcs.yarch.streamsql.StreamSqlStatement
    public void execute(YarchDatabaseInstance yarchDatabaseInstance, final ResultListener resultListener, final long j) throws StreamSqlException {
        if (resultListener == null) {
            throw new GenericStreamSqlException("Cannot select without a result listener");
        }
        final ExecutionContext executionContext = new ExecutionContext(yarchDatabaseInstance);
        Stream createStream = createStream(executionContext);
        resultListener.start(createStream.getDefinition());
        createStream.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.yarch.streamsql.SelectTableStatement.1
            @Override // org.yamcs.yarch.StreamSubscriber
            public void onTuple(Stream stream, Tuple tuple) {
                resultListener.next(tuple);
                if (stream.getDataCount() >= j) {
                    stream.close();
                }
            }

            @Override // org.yamcs.yarch.StreamSubscriber
            public void streamClosed(Stream stream) {
                resultListener.complete();
                executionContext.close();
            }
        });
        createStream.start();
    }

    @Override // org.yamcs.yarch.streamsql.StreamSqlStatement
    public StreamSqlResult execute(YarchDatabaseInstance yarchDatabaseInstance) throws StreamSqlException {
        final ExecutionContext executionContext = new ExecutionContext(yarchDatabaseInstance);
        Stream createStream = createStream(executionContext);
        QueueStreamSqlResult queueStreamSqlResult = new QueueStreamSqlResult(executionContext, createStream);
        createStream.addSubscriber(queueStreamSqlResult);
        createStream.start();
        createStream.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.yarch.streamsql.SelectTableStatement.2
            @Override // org.yamcs.yarch.StreamSubscriber
            public void onTuple(Stream stream, Tuple tuple) {
            }

            @Override // org.yamcs.yarch.StreamSubscriber
            public void streamClosed(Stream stream) {
                executionContext.close();
            }
        });
        return queueStreamSqlResult;
    }

    Stream createStream(ExecutionContext executionContext) throws StreamSqlException {
        YarchDatabaseInstance db = executionContext.getDb();
        String str = this.expression.tupleSourceExpression.objectName;
        if (db.getTable(str) == null) {
            throw new GenericStreamSqlException(String.format("Object %s does not exist or is not a table", str));
        }
        this.expression.bind(executionContext);
        return this.expression.execute(executionContext);
    }
}
