/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec.rel;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulator;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.GroupKey;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
import org.apache.ignite.internal.sql.engine.exec.rel.SingleNode;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteInternalException;

public class HashAggregateNode<RowT>
extends AbstractNode<RowT>
implements SingleNode<RowT>,
Downstream<RowT> {
    private final AggregateType type;
    private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
    private final RowHandler.RowFactory<RowT> rowFactory;
    private final ImmutableBitSet grpSet;
    private final List<Grouping> groupings;
    private int requested;
    private int waiting;
    private boolean inLoop;

    public HashAggregateNode(ExecutionContext<RowT> ctx, RelDataType rowType, AggregateType type, List<ImmutableBitSet> grpSets, Supplier<List<AccumulatorWrapper<RowT>>> accFactory, RowHandler.RowFactory<RowT> rowFactory) {
        super(ctx, rowType);
        this.type = type;
        this.accFactory = accFactory;
        this.rowFactory = rowFactory;
        ImmutableBitSet.Builder b = ImmutableBitSet.builder();
        if (grpSets.size() > 127) {
            throw new IgniteInternalException("Too many groups");
        }
        this.groupings = new ArrayList<Grouping>(grpSets.size());
        for (byte i = 0; i < grpSets.size(); i = (byte)(i + 1)) {
            ImmutableBitSet grpFields = grpSets.get(i);
            this.groupings.add(new Grouping(i, grpFields));
            b.addAll(grpFields);
        }
        this.grpSet = b.build();
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!CollectionUtils.nullOrEmpty(this.sources()) && this.sources().size() == 1);
        assert (rowsCnt > 0 && this.requested == 0);
        assert (this.waiting <= 0);
        this.checkState();
        this.requested = rowsCnt;
        if (this.waiting == 0) {
            this.waiting = 512;
            this.source().request(512);
        } else if (!this.inLoop) {
            this.context().execute(this::flush, this::onError);
        }
    }

    @Override
    public void push(RowT row) throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        this.checkState();
        --this.waiting;
        for (Grouping grouping : this.groupings) {
            grouping.add(row);
        }
        if (this.waiting == 0) {
            this.waiting = 512;
            this.source().request(512);
        }
    }

    @Override
    public void end() throws Exception {
        assert (this.downstream() != null);
        assert (this.waiting > 0);
        this.checkState();
        this.waiting = -1;
        this.flush();
    }

    @Override
    protected void rewindInternal() {
        this.requested = 0;
        this.waiting = 0;
        this.groupings.forEach(grouping -> grouping.groups.clear());
    }

    @Override
    protected Downstream<RowT> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    private boolean hasAccumulators() {
        return this.accFactory != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() throws Exception {
        if (this.isClosed()) {
            return;
        }
        this.checkState();
        assert (this.waiting == -1);
        int processed = 0;
        ArrayDeque<Grouping> groupingsQueue = this.groupingsQueue();
        this.inLoop = true;
        try {
            while (this.requested > 0 && !groupingsQueue.isEmpty()) {
                Grouping grouping = groupingsQueue.peek();
                int toSnd = Math.min(this.requested, 512 - processed);
                for (Object row : grouping.getRows(toSnd)) {
                    this.checkState();
                    --this.requested;
                    this.downstream().push(row);
                    ++processed;
                }
                if (processed >= 512 && this.requested > 0) {
                    this.context().execute(this::flush, this::onError);
                    return;
                }
                if (!grouping.isEmpty()) continue;
                groupingsQueue.remove();
            }
        }
        finally {
            this.inLoop = false;
        }
        if (this.requested > 0) {
            this.requested = 0;
            this.downstream().end();
        }
    }

    private ArrayDeque<Grouping> groupingsQueue() {
        return this.groupings.stream().filter(Commons.negate(rec$ -> ((Grouping)rec$).isEmpty())).collect(Collectors.toCollection(ArrayDeque::new));
    }

    private class Grouping {
        private final byte grpId;
        private final ImmutableBitSet grpFields;
        private final Map<GroupKey, List<AccumulatorWrapper<RowT>>> groups = new HashMap();
        private final RowHandler<RowT> handler;

        private Grouping(byte grpId, ImmutableBitSet grpFields) {
            this.grpId = grpId;
            this.grpFields = grpFields;
            this.handler = HashAggregateNode.this.context().rowHandler();
            if (grpFields.isEmpty() && (HashAggregateNode.this.type == AggregateType.REDUCE || HashAggregateNode.this.type == AggregateType.SINGLE)) {
                this.groups.put(GroupKey.EMPTY_GRP_KEY, this.create(GroupKey.EMPTY_GRP_KEY));
            }
        }

        private void add(RowT row) {
            if (HashAggregateNode.this.type == AggregateType.REDUCE) {
                this.addOnReducer(row);
            } else {
                this.addOnMapper(row);
            }
        }

        private List<RowT> getRows(int cnt) {
            if (CollectionUtils.nullOrEmpty(this.groups)) {
                return Collections.emptyList();
            }
            if (HashAggregateNode.this.type == AggregateType.MAP) {
                return this.getOnMapper(cnt);
            }
            return this.getOnReducer(cnt);
        }

        private void addOnMapper(RowT row) {
            GroupKey.Builder b = GroupKey.builder(this.grpFields.cardinality());
            for (Integer field : this.grpFields) {
                b.add(this.handler.get(field, row));
            }
            GroupKey grpKey = b.build();
            List wrappers = this.groups.computeIfAbsent(grpKey, this::create);
            for (AccumulatorWrapper wrapper : wrappers) {
                wrapper.add(row);
            }
        }

        private void addOnReducer(RowT row) {
            byte targetGrpId = (Byte)this.handler.get(0, row);
            if (targetGrpId != this.grpId) {
                return;
            }
            GroupKey grpKey = (GroupKey)this.handler.get(1, row);
            List wrappers = this.groups.computeIfAbsent(grpKey, this::create);
            List accums = HashAggregateNode.this.hasAccumulators() ? (List)this.handler.get(2, row) : Collections.emptyList();
            for (int i = 0; i < wrappers.size(); ++i) {
                AccumulatorWrapper wrapper = (AccumulatorWrapper)wrappers.get(i);
                Accumulator accum = (Accumulator)accums.get(i);
                wrapper.apply(accum);
            }
        }

        private List<RowT> getOnMapper(int cnt) {
            Iterator it = this.groups.entrySet().iterator();
            int amount = Math.min(cnt, this.groups.size());
            ArrayList res = new ArrayList(amount);
            for (int i = 0; i < amount; ++i) {
                Map.Entry entry = it.next();
                GroupKey grpKey = entry.getKey();
                List<Accumulator> accums = Commons.transform(entry.getValue(), AccumulatorWrapper::accumulator);
                Object row = HashAggregateNode.this.hasAccumulators() ? HashAggregateNode.this.rowFactory.create(this.grpId, grpKey, accums) : HashAggregateNode.this.rowFactory.create(this.grpId, grpKey);
                res.add(row);
                it.remove();
            }
            return res;
        }

        private List<RowT> getOnReducer(int cnt) {
            Iterator it = this.groups.entrySet().iterator();
            int amount = Math.min(cnt, this.groups.size());
            ArrayList res = new ArrayList(amount);
            for (int i = 0; i < amount; ++i) {
                Map.Entry entry = it.next();
                GroupKey grpKey = entry.getKey();
                List wrappers = entry.getValue();
                Object[] fields = new Object[HashAggregateNode.this.grpSet.cardinality() + wrappers.size()];
                int j = 0;
                int k = 0;
                for (Integer field : HashAggregateNode.this.grpSet) {
                    fields[j++] = this.grpFields.get(field.intValue()) ? grpKey.field(k++) : null;
                }
                for (AccumulatorWrapper wrapper : wrappers) {
                    fields[j++] = wrapper.end();
                }
                res.add(HashAggregateNode.this.rowFactory.create(fields));
                it.remove();
            }
            return res;
        }

        private List<AccumulatorWrapper<RowT>> create(GroupKey key) {
            if (HashAggregateNode.this.accFactory == null) {
                return Collections.emptyList();
            }
            return HashAggregateNode.this.accFactory.get();
        }

        private boolean isEmpty() {
            return this.groups.isEmpty();
        }
    }
}

