/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.griffin.engine.groupby.vect;

import io.questdb.MessageBus;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.NoRandomAccessRecordCursor;
import io.questdb.cairo.sql.PageFrame;
import io.questdb.cairo.sql.PageFrameCursor;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cairo.sql.VirtualRecordNoRowid;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.groupby.vect.VectorAggregateEntry;
import io.questdb.griffin.engine.groupby.vect.VectorAggregateFunction;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.RingQueue;
import io.questdb.mp.SOUnboundedCountDownLatch;
import io.questdb.mp.Sequence;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.ObjectPool;
import io.questdb.tasks.VectorAggregateTask;

public class GroupByNotKeyedVectorRecordCursorFactory
implements RecordCursorFactory {
    private static final Log LOG = LogFactory.getLog(GroupByNotKeyedVectorRecordCursorFactory.class);
    private final RecordCursorFactory base;
    private final ObjList<VectorAggregateFunction> vafList;
    private final ObjectPool<VectorAggregateEntry> entryPool = new ObjectPool<VectorAggregateEntry>(VectorAggregateEntry::new, 1024);
    private final ObjList<VectorAggregateEntry> activeEntries = new ObjList(1024);
    private final SOUnboundedCountDownLatch doneLatch = new SOUnboundedCountDownLatch();
    private final RecordMetadata metadata;
    private final GroupByNotKeyedVectorRecordCursor cursor;

    public GroupByNotKeyedVectorRecordCursorFactory(RecordCursorFactory base, RecordMetadata metadata, ObjList<VectorAggregateFunction> vafList) {
        this.base = base;
        this.metadata = metadata;
        this.vafList = vafList;
        this.cursor = new GroupByNotKeyedVectorRecordCursor(vafList);
    }

    @Override
    public RecordCursor getCursor(SqlExecutionContext executionContext) {
        int i;
        PageFrame frame;
        MessageBus bus = executionContext.getMessageBus();
        assert (bus != null);
        PageFrameCursor cursor = this.base.getPageFrameCursor(executionContext);
        int vafCount = this.vafList.size();
        for (int i2 = 0; i2 < vafCount; ++i2) {
            this.vafList.getQuick(i2).clear();
        }
        RingQueue<VectorAggregateTask> queue = bus.getVectorAggregateQueue();
        Sequence pubSeq = bus.getVectorAggregatePubSequence();
        this.entryPool.clear();
        this.activeEntries.clear();
        int queuedCount = 0;
        int ownCount = 0;
        int reclaimed = 0;
        int total = 0;
        this.doneLatch.reset();
        while ((frame = cursor.next()) != null) {
            for (i = 0; i < vafCount; ++i) {
                VectorAggregateFunction vaf = this.vafList.getQuick(i);
                int columnIndex = vaf.getColumnIndex();
                long pageAddress = frame.getPageAddress(columnIndex);
                long pageValueCount = frame.getPageValueCount(columnIndex);
                long seq = pubSeq.next();
                if (seq < 0L) {
                    vaf.aggregate(pageAddress, pageValueCount);
                    ++ownCount;
                    continue;
                }
                VectorAggregateEntry entry = this.entryPool.next();
                entry.of(queuedCount++, vaf, pageAddress, pageValueCount, this.doneLatch);
                this.activeEntries.add(entry);
                queue.get((long)seq).entry = entry;
                pubSeq.done(seq);
            }
            ++total;
        }
        for (i = this.activeEntries.size() - 1; i > -1 && this.doneLatch.getCount() > -queuedCount; --i) {
            if (!this.activeEntries.getQuick(i).run()) continue;
            ++reclaimed;
        }
        LOG.info().$("waiting for parts [queuedCount=").$(queuedCount).$(']').$();
        this.doneLatch.await(queuedCount);
        LOG.info().$("done [total=").$(total).$(", ownCount=").$(ownCount).$(", reclaimed=").$(reclaimed).$(", queuedCount=").$(queuedCount).$(']').$();
        return this.cursor.of(cursor);
    }

    @Override
    public RecordMetadata getMetadata() {
        return this.metadata;
    }

    @Override
    public boolean recordCursorSupportsRandomAccess() {
        return false;
    }

    private static class GroupByNotKeyedVectorRecordCursor
    implements NoRandomAccessRecordCursor {
        private final Record recordA;
        private int countDown = 1;
        private PageFrameCursor pageFrameCursor;

        public GroupByNotKeyedVectorRecordCursor(ObjList<? extends Function> functions) {
            this.recordA = new VirtualRecordNoRowid(functions);
        }

        @Override
        public void close() {
            Misc.free(this.pageFrameCursor);
        }

        @Override
        public Record getRecord() {
            return this.recordA;
        }

        @Override
        public boolean hasNext() {
            return this.countDown-- > 0;
        }

        @Override
        public void toTop() {
            this.countDown = 1;
        }

        @Override
        public long size() {
            return 1L;
        }

        private GroupByNotKeyedVectorRecordCursor of(PageFrameCursor pageFrameCursor) {
            this.pageFrameCursor = pageFrameCursor;
            this.toTop();
            return this;
        }
    }
}

