package org.yamcs.yarch.streamsql;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yamcs.utils.MutableLong;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.ExecutionContext;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.TableDefinition;
import org.yamcs.yarch.TableWriter;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabaseInstance;
import org.yamcs.yarch.YarchException;

/* loaded from: input_file:org/yamcs/yarch/streamsql/InsertStatement.class */
public class InsertStatement extends SimpleStreamSqlStatement {
    String name;
    StreamExpression expression;
    TableWriter.InsertMode insertMode;
    static Logger log = LoggerFactory.getLogger(InsertStatement.class.getName());
    private static final TupleDefinition TDEF = new TupleDefinition();

    public InsertStatement(String str, StreamExpression streamExpression, TableWriter.InsertMode insertMode) {
        this.name = str;
        this.expression = streamExpression;
        this.insertMode = insertMode;
    }

    @Override // org.yamcs.yarch.streamsql.SimpleStreamSqlStatement
    protected void execute(ExecutionContext executionContext, Consumer<Tuple> consumer) throws StreamSqlException {
        YarchDatabaseInstance db = executionContext.getDb();
        TableDefinition table = db.getTable(this.name);
        final Stream stream = table == null ? db.getStream(this.name) : null;
        if (table == null && stream == null) {
            throw new ResourceNotFoundException(this.name);
        }
        this.expression.bind(executionContext);
        Stream execute = this.expression.execute(executionContext);
        if (table != null) {
            try {
                TableWriter newTableWriter = db.getStorageEngine(table).newTableWriter(db, table, this.insertMode);
                execute.addSubscriber(newTableWriter);
                newTableWriter.closeFuture().thenAccept(r5 -> {
                    execute.removeSubscriber(newTableWriter);
                });
            } catch (YarchException e) {
                log.warn("Exception while inserting into table", e);
                throw new GenericStreamSqlException(e.getMessage());
            }
        } else {
            execute.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.yarch.streamsql.InsertStatement.1
                @Override // org.yamcs.yarch.StreamSubscriber
                public void streamClosed(Stream stream2) {
                    InsertStatement.log.debug("InputStream {} closed", stream2.getName());
                }

                @Override // org.yamcs.yarch.StreamSubscriber
                public void onTuple(Stream stream2, Tuple tuple) {
                    stream.emitTuple(tuple);
                }
            });
        }
        Long l = null;
        if (this.expression.isFinite()) {
            final Semaphore semaphore = new Semaphore(0);
            final MutableLong mutableLong = new MutableLong(0L);
            execute.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.yarch.streamsql.InsertStatement.2
                @Override // org.yamcs.yarch.StreamSubscriber
                public void onTuple(Stream stream2, Tuple tuple) {
                    mutableLong.increment();
                }

                @Override // org.yamcs.yarch.StreamSubscriber
                public void streamClosed(Stream stream2) {
                    semaphore.release();
                }
            });
            execute.start();
            try {
                semaphore.acquire();
                l = Long.valueOf(mutableLong.getLong());
            } catch (InterruptedException e2) {
                throw new GenericStreamSqlException("Interrupted");
            }
        } else {
            execute.start();
        }
        TupleDefinition tupleDefinition = new TupleDefinition();
        tupleDefinition.addColumn("inserted", DataType.LONG);
        consumer.accept(new Tuple(tupleDefinition, (List<Object>) Arrays.asList(l)));
    }

    @Override // org.yamcs.yarch.streamsql.SimpleStreamSqlStatement
    protected TupleDefinition getResultDefinition() {
        return TDEF;
    }

    static {
        TDEF.addColumn("inserted", DataType.LONG);
    }
}
