package io.datakernel.stream.processor;

import io.datakernel.async.Stage;
import io.datakernel.async.StagesAccumulator;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSorter.class */
public final class StreamSorter<K, T> implements StreamTransformer<T, T> {
    private final StagesAccumulator<List<Integer>> temporaryStreams = StagesAccumulator.create(new ArrayList());
    private final StreamSorterStorage<T> storage;
    private final Comparator<T> itemComparator;
    private final int itemsInMemory;
    private StreamSorter<K, T>.Input input;
    private StreamProducer<T> output;

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSorter$Input.class */
    private final class Input extends AbstractStreamConsumer<T> implements StreamDataReceiver<T> {
        private ArrayList<T> list;

        private Input() {
            this.list = new ArrayList<>();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            getProducer().produce(this);
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(T t) {
            this.list.add(t);
            if (this.list.size() >= StreamSorter.this.itemsInMemory) {
                this.list.sort(StreamSorter.this.itemComparator);
                writeToTemporaryStorage(this.list).thenRun(this::suspendOrResume);
                suspendOrResume();
                this.list = new ArrayList<>(StreamSorter.this.itemsInMemory);
            }
        }

        private Stage<Integer> writeToTemporaryStorage(List<T> list) {
            return StreamSorter.this.temporaryStreams.addStage(StreamSorter.this.storage.write().thenCompose(streamConsumerWithResult -> {
                return StreamProducer.ofIterable(list).streamTo(streamConsumerWithResult).getConsumerResult();
            }), (v0, v1) -> {
                v0.add(v1);
            });
        }

        private void suspendOrResume() {
            if (StreamSorter.this.temporaryStreams.getActiveStages() > 2) {
                getProducer().suspend();
            } else {
                getProducer().produce(this);
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
        }
    }

    private StreamSorter(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        Preconditions.checkArgument(i > 0, "itemsInMemorySize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        Preconditions.checkNotNull(comparator);
        Preconditions.checkNotNull(function);
        Preconditions.checkNotNull(streamSorterStorage);
        this.itemsInMemory = i;
        this.itemComparator = (obj, obj2) -> {
            return comparator.compare(function.apply(obj), function.apply(obj2));
        };
        this.storage = streamSorterStorage;
        this.input = new Input();
        this.temporaryStreams.addStage(this.input.getEndOfStream(), (list, r2) -> {
        });
        this.output = StreamProducer.ofStage(this.temporaryStreams.get().thenApply(list2 -> {
            ((Input) this.input).list.sort(this.itemComparator);
            StreamProducer ofIterable = StreamProducer.ofIterable(((Input) this.input).list);
            if (list2.isEmpty()) {
                return ofIterable;
            }
            StreamMerger create = StreamMerger.create(function, comparator, z);
            ofIterable.streamTo(create.newInput());
            list2.forEach(num -> {
                StreamProducerWithResult.ofStage((Stage) streamSorterStorage.read(num.intValue())).streamTo((StreamConsumer) create.newInput());
            });
            return create.getOutput().withLateBinding();
        }));
    }

    public static <K, T> StreamSorter<K, T> create(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        return new StreamSorter<>(streamSorterStorage, function, comparator, z, i);
    }

    @Override // io.datakernel.stream.HasInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.HasOutput
    public StreamProducer<T> getOutput() {
        return this.output;
    }
}
