package io.datakernel.datastream.processor;

import io.datakernel.async.process.AsyncCollector;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.ForwardingStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/datakernel/datastream/processor/StreamSorter.class */
public final class StreamSorter<K, T> implements StreamTransformer<T, T> {
    private final AsyncCollector<? extends List<Integer>> temporaryStreamsCollector;
    private final StreamSorterStorage<T> storage;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final Comparator<T> itemComparator;
    private final boolean distinct;
    private final int itemsInMemory;
    private final StreamSorter<K, T>.Input input = new Input();
    private final StreamSupplier<T> output;
    private StreamConsumer<T> outputConsumer;

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamSorter$DistinctIterator.class */
    private static final class DistinctIterator<K, T> implements Iterator<T> {
        private final ArrayList<T> sortedList;
        private final Function<T, K> keyFunction;
        private final Comparator<K> keyComparator;
        int i;

        private DistinctIterator(ArrayList<T> arrayList, Function<T, K> function, Comparator<K> comparator) {
            this.i = 0;
            this.sortedList = arrayList;
            this.keyFunction = function;
            this.keyComparator = comparator;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.i < this.sortedList.size();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.Iterator
        public T next() {
            ArrayList<T> arrayList = this.sortedList;
            int i = this.i;
            this.i = i + 1;
            T t = arrayList.get(i);
            K apply = this.keyFunction.apply(t);
            while (this.i < this.sortedList.size() && this.keyComparator.compare(apply, this.keyFunction.apply(this.sortedList.get(this.i))) == 0) {
                this.i++;
            }
            return t;
        }
    }

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamSorter$Input.class */
    private final class Input extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
        private ArrayList<T> list;

        private Input() {
            this.list = new ArrayList<>();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onStarted() {
            getSupplier().resume(this);
        }

        @Override // io.datakernel.datastream.StreamDataAcceptor
        public void accept(T t) {
            this.list.add(t);
            if (this.list.size() >= StreamSorter.this.itemsInMemory) {
                this.list.sort(StreamSorter.this.itemComparator);
                writeToTemporaryStorage(!StreamSorter.this.distinct ? StreamSorter.this.input.list.iterator() : new DistinctIterator<>(StreamSorter.this.input.list, StreamSorter.this.keyFunction, StreamSorter.this.keyComparator)).whenResult(num -> {
                    suspendOrResume();
                });
                suspendOrResume();
                this.list = new ArrayList<>(StreamSorter.this.itemsInMemory);
            }
        }

        private Promise<Integer> writeToTemporaryStorage(Iterator<T> it) {
            return StreamSorter.this.temporaryStreamsCollector.addPromise(StreamSorter.this.storage.newPartitionId().then(num -> {
                return StreamSorter.this.storage.write(num.intValue()).then(streamConsumer -> {
                    return StreamSupplier.ofIterator(it).streamTo(streamConsumer).map(r3 -> {
                        return num;
                    });
                });
            }), (v0, v1) -> {
                v0.add(v1);
            });
        }

        private void suspendOrResume() {
            if (StreamSorter.this.temporaryStreamsCollector.getActivePromises() > 2) {
                getSupplier().suspend();
            } else {
                getSupplier().resume(this);
            }
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return StreamSorter.this.outputConsumer.getAcknowledgement();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onError(Throwable th) {
        }
    }

    private StreamSorter(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        this.storage = streamSorterStorage;
        this.keyFunction = function;
        this.keyComparator = comparator;
        this.itemComparator = (obj, obj2) -> {
            return comparator.compare(function.apply(obj), function.apply(obj2));
        };
        this.distinct = z;
        this.itemsInMemory = i;
        AsyncCollector<? extends List<Integer>> create = AsyncCollector.create(new ArrayList());
        this.temporaryStreamsCollector = create;
        this.output = new ForwardingStreamSupplier<T>(StreamSupplier.ofPromise(create.run(this.input.getEndOfStream()).get().map(list -> {
            ((Input) this.input).list.sort(this.itemComparator);
            StreamSupplier ofIterator = StreamSupplier.ofIterator(!z ? ((Input) this.input).list.iterator() : new DistinctIterator(((Input) this.input).list, function, comparator));
            if (list.isEmpty()) {
                return ofIterator;
            }
            StreamMerger create2 = StreamMerger.create(function, comparator, z);
            ofIterator.streamTo(create2.newInput());
            list.forEach(num -> {
                StreamSupplier.ofPromise(streamSorterStorage.read(num.intValue())).streamTo(create2.newInput());
            });
            return create2.getOutput().withLateBinding();
        }))) { // from class: io.datakernel.datastream.processor.StreamSorter.1
            @Override // io.datakernel.datastream.ForwardingStreamSupplier, io.datakernel.datastream.StreamSupplier
            public void setConsumer(@NotNull StreamConsumer<T> streamConsumer) {
                super.setConsumer(streamConsumer);
                StreamSorter.this.outputConsumer = streamConsumer;
            }
        };
    }

    public static <K, T> StreamSorter<K, T> create(StreamSorterStorage<T> streamSorterStorage, Function<T, K> function, Comparator<K> comparator, boolean z, int i) {
        return new StreamSorter<>(streamSorterStorage, function, comparator, z, i);
    }

    @Override // io.datakernel.datastream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.datastream.StreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
