/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite.internal.sql.engine.exec.rel.SingleNode;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;

public class LimitNode<RowT>
extends AbstractNode<RowT>
implements SingleNode<RowT>,
Downstream<RowT> {
    private final int offset;
    private final int fetch;
    private int rowsProcessed;
    @Nullable
    private Supplier<Integer> fetchNode;
    private int waiting;

    public LimitNode(ExecutionContext<RowT> ctx, RelDataType rowType, Supplier<Integer> offsetNode, Supplier<Integer> fetchNode) {
        super(ctx, rowType);
        this.offset = offsetNode == null ? 0 : offsetNode.get();
        this.fetch = fetchNode == null ? 0 : fetchNode.get();
        this.fetchNode = fetchNode;
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 1);
        assert (rowsCnt > 0);
        if (this.fetchNone()) {
            this.end();
            return;
        }
        if (this.offset > 0 && this.rowsProcessed == 0) {
            rowsCnt = this.offset + rowsCnt;
        }
        this.checkState();
        this.waiting = rowsCnt;
        this.source().request(this.waiting);
    }

    @Override
    public void push(RowT row) throws Exception {
        if (this.waiting == -1) {
            return;
        }
        ++this.rowsProcessed;
        --this.waiting;
        this.checkState();
        if (this.rowsProcessed > this.offset && (this.fetchNode == null || this.rowsProcessed <= this.fetch + this.offset)) {
            this.downstream().push(row);
        }
        if (this.fetch > 0 && this.rowsProcessed == this.fetch + this.offset && this.waiting > 0) {
            this.end();
        }
    }

    @Override
    public void end() throws Exception {
        if (this.waiting == -1) {
            return;
        }
        assert (this.downstream() != null);
        this.waiting = -1;
        this.downstream().end();
    }

    @Override
    protected void rewindInternal() {
        this.rowsProcessed = 0;
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    private boolean fetchNone() {
        return this.fetchNode != null && this.fetch == 0 || this.fetch > 0 && this.rowsProcessed == this.fetch + this.offset;
    }
}

