package io.datakernel.aggregation;

import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.aggregation.util.PartitionPredicate;
import io.datakernel.async.process.AsyncCollector;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import java.lang.Comparable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/aggregation/AggregationGroupReducer.class */
public final class AggregationGroupReducer<C, T, K extends Comparable> extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
    private static final Logger logger = LoggerFactory.getLogger(AggregationGroupReducer.class);
    private final AggregationChunkStorage<C> storage;
    private final AggregationStructure aggregation;
    private final List<String> measures;
    private final PartitionPredicate<T> partitionPredicate;
    private final Class<T> recordClass;
    private final Function<T, K> keyFunction;
    private final Aggregate<T, Object> aggregate;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;
    private final HashMap<K, Object> map = new HashMap<>();
    private final AsyncCollector<List<AggregationChunk>> chunksCollector = AsyncCollector.create(new ArrayList());

    public AggregationGroupReducer(@NotNull AggregationChunkStorage<C> aggregationChunkStorage, @NotNull AggregationStructure aggregationStructure, @NotNull List<String> list, @NotNull Class<T> cls, @NotNull PartitionPredicate<T> partitionPredicate, @NotNull Function<T, K> function, @NotNull Aggregate<T, Object> aggregate, int i, @NotNull DefiningClassLoader definingClassLoader) {
        this.storage = aggregationChunkStorage;
        this.measures = list;
        this.partitionPredicate = partitionPredicate;
        this.recordClass = cls;
        this.keyFunction = function;
        this.aggregate = aggregate;
        this.chunkSize = i;
        this.aggregation = aggregationStructure;
        this.classLoader = definingClassLoader;
    }

    public Promise<List<AggregationChunk>> getResult() {
        return this.chunksCollector.get();
    }

    public void accept(T t) {
        K apply = this.keyFunction.apply(t);
        Object obj = this.map.get(apply);
        if (obj != null) {
            this.aggregate.accumulate(obj, t);
            return;
        }
        this.map.put(apply, this.aggregate.createAccumulator(t));
        if (this.map.size() == this.chunkSize) {
            doFlush();
        }
    }

    protected void onStarted() {
        getSupplier().resume(this);
    }

    private void doFlush() {
        if (this.map.isEmpty()) {
            return;
        }
        suspendOrResume();
        ArrayList arrayList = new ArrayList(this.map.entrySet());
        this.map.clear();
        arrayList.sort((entry, entry2) -> {
            return ((Comparable) entry.getKey()).compareTo((Comparable) entry2.getKey());
        });
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(((Map.Entry) it.next()).getValue());
        }
        StreamSupplier ofIterable = StreamSupplier.ofIterable(arrayList2);
        AggregationChunker create = AggregationChunker.create(this.aggregation, this.measures, this.recordClass, this.partitionPredicate, this.storage, this.classLoader, this.chunkSize);
        this.chunksCollector.addPromise(ofIterable.streamTo(create).then(r3 -> {
            return create.getResult();
        }), (v0, v1) -> {
            v0.addAll(v1);
        }).whenResult(list -> {
            suspendOrResume();
        });
    }

    private void suspendOrResume() {
        if (this.chunksCollector.getActivePromises() > 2) {
            logger.trace("Suspend group reduce: {}", this);
            getSupplier().suspend();
        } else {
            logger.trace("Resume group reduce: {}", this);
            getSupplier().resume(this);
        }
    }

    protected Promise<Void> onEndOfStream() {
        doFlush();
        return this.chunksCollector.run().get().toVoid();
    }

    protected void onError(Throwable th) {
        this.chunksCollector.close(th);
    }

    public void flush() {
        doFlush();
    }

    public int getBufferSize() {
        return this.map.size();
    }

    public String toString() {
        return "AggregationGroupReducer{keys=" + this.aggregation.getKeys() + "measures=" + this.measures + ", chunkSize=" + this.chunkSize + ", map.size=" + this.map.size() + '}';
    }
}
