package io.activej.cube.aggregation;

import io.activej.async.AsyncAccumulator;
import io.activej.codegen.DefiningClassLoader;
import io.activej.cube.AggregationStructure;
import io.activej.cube.aggregation.util.PartitionPredicate;
import io.activej.datastream.consumer.ForwardingStreamConsumer;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.consumer.SwitcherStreamConsumer;
import io.activej.datastream.supplier.StreamDataAcceptor;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/* loaded from: input_file:io/activej/cube/aggregation/AggregationChunker.class */
public final class AggregationChunker<T> extends ForwardingStreamConsumer<T> {
    private final SwitcherStreamConsumer<T> switcher;
    private final SettablePromise<List<AggregationChunk>> result;
    private final AggregationStructure aggregation;
    private final List<String> fields;
    private final Class<T> recordClass;
    private final PartitionPredicate<T> partitionPredicate;
    private final IAggregationChunkStorage storage;
    private final AsyncAccumulator<List<AggregationChunk>> chunksAccumulator;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;

    /* loaded from: input_file:io/activej/cube/aggregation/AggregationChunker$ChunkWriter.class */
    public class ChunkWriter extends ForwardingStreamConsumer<T> implements StreamDataAcceptor<T> {
        private final SettablePromise<AggregationChunk> result;
        private final int chunkSize;
        private final PartitionPredicate<T> partitionPredicate;
        private StreamDataAcceptor<T> dataAcceptor;
        private T first;
        private T last;
        private int count;

        public ChunkWriter(StreamConsumer<T> streamConsumer, long j, int i, PartitionPredicate<T> partitionPredicate) {
            super(streamConsumer);
            this.result = new SettablePromise<>();
            this.chunkSize = i;
            this.partitionPredicate = partitionPredicate;
            Promise map = streamConsumer.getAcknowledgement().map(r10 -> {
                if (this.count == 0) {
                    return null;
                }
                return AggregationChunk.create(j, AggregationChunker.this.fields, PrimaryKey.ofObject(this.first, AggregationChunker.this.aggregation.getKeys()), PrimaryKey.ofObject(this.last, AggregationChunker.this.aggregation.getKeys()), this.count);
            });
            SettablePromise<AggregationChunk> settablePromise = this.result;
            Objects.requireNonNull(settablePromise);
            map.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
        }

        public StreamDataAcceptor<T> getDataAcceptor() {
            this.dataAcceptor = super.getDataAcceptor();
            if (this.dataAcceptor != null) {
                return this;
            }
            return null;
        }

        public void accept(T t) {
            if (this.first == null) {
                this.first = t;
            }
            this.last = t;
            this.dataAcceptor.accept(t);
            int i = this.count + 1;
            this.count = i;
            if (i == this.chunkSize || !(this.partitionPredicate == null || this.partitionPredicate.isSamePartition(this.last, t))) {
                AggregationChunker.this.startNewChunk();
            }
        }

        public Promise<AggregationChunk> getResult() {
            return this.result;
        }
    }

    private AggregationChunker(SwitcherStreamConsumer<T> switcherStreamConsumer, AggregationStructure aggregationStructure, List<String> list, Class<T> cls, PartitionPredicate<T> partitionPredicate, IAggregationChunkStorage iAggregationChunkStorage, DefiningClassLoader definingClassLoader, int i) {
        super(switcherStreamConsumer);
        this.result = new SettablePromise<>();
        this.switcher = switcherStreamConsumer;
        this.aggregation = aggregationStructure;
        this.fields = list;
        this.recordClass = cls;
        this.partitionPredicate = partitionPredicate;
        this.storage = iAggregationChunkStorage;
        this.classLoader = definingClassLoader;
        this.chunkSize = i;
        AsyncAccumulator<List<AggregationChunk>> create = AsyncAccumulator.create(new ArrayList());
        this.chunksAccumulator = create;
        Promise run = create.run(getAcknowledgement());
        SettablePromise<List<AggregationChunk>> settablePromise = this.result;
        Objects.requireNonNull(settablePromise);
        run.whenComplete((v1, v2) -> {
            r1.trySet(v1, v2);
        });
    }

    public static <T> AggregationChunker<T> create(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, PartitionPredicate<T> partitionPredicate, IAggregationChunkStorage iAggregationChunkStorage, DefiningClassLoader definingClassLoader, int i) {
        AggregationChunker<T> aggregationChunker = new AggregationChunker<>(SwitcherStreamConsumer.create(), aggregationStructure, list, cls, partitionPredicate, iAggregationChunkStorage, definingClassLoader, i);
        aggregationChunker.startNewChunk();
        return aggregationChunker;
    }

    public Promise<List<AggregationChunk>> getResult() {
        return this.result;
    }

    private void startNewChunk() {
        this.switcher.switchTo(StreamConsumers.ofPromise(this.storage.createId().then(l -> {
            return this.storage.write(this.aggregation, this.fields, this.recordClass, l.longValue(), this.classLoader).map(streamConsumer -> {
                return new ChunkWriter(streamConsumer, l.longValue(), this.chunkSize, this.partitionPredicate);
            }).whenResult(chunkWriter -> {
                this.chunksAccumulator.addPromise(chunkWriter.getResult(), (list, aggregationChunk) -> {
                    if (aggregationChunk == null || aggregationChunk.getCount() == 0) {
                        return;
                    }
                    list.add(aggregationChunk);
                });
            });
        })));
    }
}
