/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite.internal.sql.engine.exec.rel.SingleNode;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.CollectionUtils;

public class SortAggregateNode<RowT>
extends AbstractNode<RowT>
implements SingleNode<RowT>,
Downstream<RowT> {
    private final AggregateType type;
    private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
    private final RowHandler.RowFactory<RowT> rowFactory;
    private final ImmutableBitSet grpSet;
    private final Comparator<RowT> comp;
    private final Deque<RowT> outBuf = new ArrayDeque<RowT>(512);
    private RowT prevRow;
    private Group grp;
    private int requested;
    private int waiting;
    private int cmpRes;

    public SortAggregateNode(ExecutionContext<RowT> ctx, RelDataType rowType, AggregateType type, ImmutableBitSet grpSet, Supplier<List<AccumulatorWrapper<RowT>>> accFactory, RowHandler.RowFactory<RowT> rowFactory, Comparator<RowT> comp) {
        super(ctx, rowType);
        assert (Objects.nonNull(comp));
        this.type = type;
        this.accFactory = accFactory;
        this.rowFactory = rowFactory;
        this.grpSet = grpSet;
        this.comp = comp;
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 1);
        assert (rowsCnt > 0 && this.requested == 0);
        this.checkState();
        this.requested = rowsCnt;
        if (!this.outBuf.isEmpty()) {
            this.doPush();
        }
        if (this.waiting == 0) {
            this.waiting = 512;
            this.source().request(512);
        } else if (this.waiting < 0) {
            this.downstream().end();
        }
    }

    @Override
    public void push(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        this.checkState();
        --this.waiting;
        if (this.grp != null) {
            int cmp = this.comp.compare(row, this.prevRow);
            if (cmp == 0) {
                this.grp.add(row);
            } else {
                if (this.cmpRes == 0) {
                    this.cmpRes = cmp;
                } else assert (Integer.signum(cmp) == Integer.signum(this.cmpRes)) : "Input not sorted";
                this.outBuf.add(this.grp.row());
                this.grp = this.newGroup(row);
                this.doPush();
            }
        } else {
            this.grp = this.newGroup(row);
        }
        this.prevRow = row;
        if (this.waiting == 0 && this.requested > 0) {
            this.waiting = 512;
            this.context().execute(() -> this.source().request(512), this::onError);
        }
    }

    @Override
    public void end() throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        this.checkState();
        this.waiting = -1;
        if (this.grp != null) {
            this.outBuf.add(this.grp.row());
            this.doPush();
        }
        if (this.requested > 0) {
            this.downstream().end();
        }
        this.grp = null;
        this.prevRow = null;
    }

    @Override
    protected void rewindInternal() {
        this.requested = 0;
        this.waiting = 0;
        this.grp = null;
        this.prevRow = null;
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    private boolean hasAccumulators() {
        return this.accFactory != null;
    }

    private Group newGroup(RowT r) {
        Object[] grpKeys = new Object[this.grpSet.cardinality()];
        List fldIdxs = this.grpSet.asList();
        RowHandler<RowT> rowHandler = this.rowFactory.handler();
        for (int i = 0; i < grpKeys.length; ++i) {
            grpKeys[i] = rowHandler.get((Integer)fldIdxs.get(i), r);
        }
        Group grp = new Group(grpKeys);
        grp.add(r);
        return grp;
    }

    private void doPush() throws Exception {
        while (this.requested > 0 && !this.outBuf.isEmpty()) {
            --this.requested;
            this.downstream().push(this.outBuf.poll());
        }
    }

    private class Group {
        private final List<AccumulatorWrapper<RowT>> accumWrps;
        private final RowHandler<RowT> handler;
        private final Object[] grpKeys;

        private Group(Object[] grpKeys) {
            this.grpKeys = grpKeys;
            this.accumWrps = SortAggregateNode.this.hasAccumulators() ? SortAggregateNode.this.accFactory.get() : Collections.emptyList();
            this.handler = SortAggregateNode.this.context().rowHandler();
        }

        private void add(RowT row) {
            if (SortAggregateNode.this.type == AggregateType.REDUCE) {
                this.addOnReducer(row);
            } else {
                this.addOnMapper(row);
            }
        }

        private RowT row() {
            if (SortAggregateNode.this.type == AggregateType.MAP) {
                return this.rowOnMapper();
            }
            return this.rowOnReducer();
        }

        private void addOnMapper(RowT row) {
            for (AccumulatorWrapper wrapper : this.accumWrps) {
                wrapper.add(row);
            }
        }

        private void addOnReducer(RowT row) {
            List accums = SortAggregateNode.this.hasAccumulators() ? (List)this.handler.get(this.handler.columnCount(row) - 1, row) : Collections.emptyList();
            for (int i = 0; i < accums.size(); ++i) {
                AccumulatorWrapper wrapper = this.accumWrps.get(i);
                Accumulator accum = (Accumulator)accums.get(i);
                wrapper.apply(accum);
            }
        }

        private RowT rowOnMapper() {
            Object[] fields = new Object[SortAggregateNode.this.grpSet.cardinality() + (SortAggregateNode.this.accFactory != null ? 1 : 0)];
            int i = 0;
            for (Object grpKey : this.grpKeys) {
                fields[i++] = grpKey;
            }
            if (SortAggregateNode.this.hasAccumulators()) {
                fields[i] = Commons.transform(this.accumWrps, AccumulatorWrapper::accumulator);
            }
            return SortAggregateNode.this.rowFactory.create(fields);
        }

        private RowT rowOnReducer() {
            Object[] fields = new Object[SortAggregateNode.this.grpSet.cardinality() + this.accumWrps.size()];
            int i = 0;
            for (Object grpKey : this.grpKeys) {
                fields[i++] = grpKey;
            }
            for (AccumulatorWrapper accumulatorWrapper : this.accumWrps) {
                fields[i++] = accumulatorWrapper.end();
            }
            return SortAggregateNode.this.rowFactory.create(fields);
        }
    }
}

