package io.datakernel.aggregation;

import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.aggregation.util.PartitionPredicate;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.async.StagesAccumulator;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.stream.ForwardingStreamConsumer;
import io.datakernel.stream.ForwardingStreamProducer;
import io.datakernel.stream.StreamConsumerSwitcher;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import java.util.ArrayList;
import java.util.List;

/* loaded from: input_file:io/datakernel/aggregation/AggregationChunker.class */
public final class AggregationChunker<T> extends ForwardingStreamConsumer<T> implements StreamConsumerWithResult<T, List<AggregationChunk>> {
    private final StreamConsumerSwitcher<T> switcher;
    private final SettableStage<List<AggregationChunk>> result;
    private final AggregationStructure aggregation;
    private final List<String> fields;
    private final Class<T> recordClass;
    private final PartitionPredicate<T> partitionPredicate;
    private final AggregationChunkStorage storage;
    private final StagesAccumulator<List<AggregationChunk>> chunksAccumulator;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;

    /* loaded from: input_file:io/datakernel/aggregation/AggregationChunker$ChunkWriter.class */
    private class ChunkWriter extends ForwardingStreamConsumer<T> implements StreamConsumerWithResult<T, AggregationChunk>, StreamDataReceiver<T> {
        private final SettableStage<AggregationChunk> result;
        private final long chunkId;
        private final int chunkSize;
        private final PartitionPredicate<T> partitionPredicate;
        private StreamDataReceiver<T> dataReceiver;
        private T first;
        private T last;
        private int count;
        boolean switched;

        public ChunkWriter(StreamConsumerWithResult<T, Void> streamConsumerWithResult, long j, int i, PartitionPredicate<T> partitionPredicate) {
            super(streamConsumerWithResult);
            this.result = SettableStage.create();
            this.chunkId = j;
            this.chunkSize = i;
            this.partitionPredicate = partitionPredicate;
            Stage thenApply = streamConsumerWithResult.getResult().thenApply(r10 -> {
                if (this.count == 0) {
                    return null;
                }
                return AggregationChunk.create(j, AggregationChunker.this.fields, PrimaryKey.ofObject(this.first, AggregationChunker.this.aggregation.getKeys()), PrimaryKey.ofObject(this.last, AggregationChunker.this.aggregation.getKeys()), this.count);
            });
            SettableStage<AggregationChunk> settableStage = this.result;
            settableStage.getClass();
            thenApply.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
            Stage endOfStream = getEndOfStream();
            SettableStage<AggregationChunk> settableStage2 = this.result;
            settableStage2.getClass();
            endOfStream.whenException(settableStage2::trySetException);
        }

        public void setProducer(StreamProducer<T> streamProducer) {
            super.setProducer(new ForwardingStreamProducer<T>(streamProducer) { // from class: io.datakernel.aggregation.AggregationChunker.ChunkWriter.1
                public void produce(StreamDataReceiver<T> streamDataReceiver) {
                    ChunkWriter.this.dataReceiver = streamDataReceiver;
                    super.produce(ChunkWriter.this);
                }
            });
        }

        public void onData(T t) {
            if (this.first == null) {
                this.first = t;
            }
            this.last = t;
            this.dataReceiver.onData(t);
            int i = this.count + 1;
            this.count = i;
            if ((i == this.chunkSize || !(this.partitionPredicate == null || this.partitionPredicate.isSamePartition(this.last, t))) && !this.switched) {
                this.switched = true;
                AggregationChunker.this.startNewChunk();
            }
        }

        public Stage<AggregationChunk> getResult() {
            return this.result;
        }
    }

    private AggregationChunker(StreamConsumerSwitcher<T> streamConsumerSwitcher, AggregationStructure aggregationStructure, List<String> list, Class<T> cls, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage aggregationChunkStorage, DefiningClassLoader definingClassLoader, int i) {
        super(streamConsumerSwitcher);
        this.result = SettableStage.create();
        this.switcher = streamConsumerSwitcher;
        this.aggregation = aggregationStructure;
        this.fields = list;
        this.recordClass = cls;
        this.partitionPredicate = partitionPredicate;
        this.storage = aggregationChunkStorage;
        this.classLoader = definingClassLoader;
        this.chunksAccumulator = StagesAccumulator.create(new ArrayList()).withStage(streamConsumerSwitcher.getEndOfStream(), (list2, r2) -> {
        });
        this.chunkSize = i;
        Stage stage = this.chunksAccumulator.get();
        SettableStage<List<AggregationChunk>> settableStage = this.result;
        settableStage.getClass();
        stage.whenComplete((v1, v2) -> {
            r1.trySet(v1, v2);
        });
        Stage endOfStream = getEndOfStream();
        SettableStage<List<AggregationChunk>> settableStage2 = this.result;
        settableStage2.getClass();
        endOfStream.whenException(settableStage2::trySetException);
    }

    public static <T> AggregationChunker<T> create(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage aggregationChunkStorage, DefiningClassLoader definingClassLoader, int i) {
        AggregationChunker<T> aggregationChunker = new AggregationChunker<>(StreamConsumerSwitcher.create(), aggregationStructure, list, cls, partitionPredicate, aggregationChunkStorage, definingClassLoader, i);
        aggregationChunker.startNewChunk();
        return aggregationChunker;
    }

    public Stage<List<AggregationChunk>> getResult() {
        return this.result;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startNewChunk() {
        StreamConsumerWithResult ofStage = StreamConsumerWithResult.ofStage(this.storage.createId().thenCompose(l -> {
            return this.storage.write(this.aggregation, this.fields, this.recordClass, l.longValue(), this.classLoader).thenApply(streamConsumerWithResult -> {
                return new ChunkWriter(streamConsumerWithResult, l.longValue(), this.chunkSize, this.partitionPredicate).withLateBinding();
            });
        }));
        this.switcher.switchTo(ofStage);
        this.chunksAccumulator.addStage(ofStage.getResult(), (list, aggregationChunk) -> {
            if (aggregationChunk == null || aggregationChunk.getCount() == 0) {
                return;
            }
            list.add(aggregationChunk);
        });
    }
}
