/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.async.Stage;
import io.datakernel.async.StagesAccumulator;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.DataStreams;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.processor.StreamMerger;
import io.datakernel.stream.processor.StreamSorterStorage;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

public final class StreamSorter<K, T>
implements StreamTransformer<T, T> {
    private final StagesAccumulator<List<Integer>> temporaryStreams = StagesAccumulator.create(new ArrayList());
    private final StreamSorterStorage<T> storage;
    private final Comparator<T> itemComparator;
    private final int itemsInMemory;
    private Input input;
    private StreamProducer<T> output;

    private StreamSorter(StreamSorterStorage<T> storage, Function<T, K> keyFunction, Comparator<K> keyComparator, boolean deduplicate, int itemsInMemory) {
        Preconditions.checkArgument((itemsInMemory > 0 ? 1 : 0) != 0, (String)"itemsInMemorySize must be positive value, got %s", (Object[])new Object[]{itemsInMemory});
        Preconditions.checkNotNull(keyComparator);
        Preconditions.checkNotNull(keyFunction);
        Preconditions.checkNotNull(storage);
        this.itemsInMemory = itemsInMemory;
        this.itemComparator = (item1, item2) -> {
            Object key1 = keyFunction.apply(item1);
            Object key2 = keyFunction.apply(item2);
            return keyComparator.compare(key1, key2);
        };
        this.storage = storage;
        this.input = new Input();
        this.temporaryStreams.addStage(this.input.getEndOfStream(), (accumulator, $) -> {});
        Stage outputStreamStage = this.temporaryStreams.get().thenApply(streamIds -> {
            this.input.list.sort(this.itemComparator);
            StreamProducer listProducer = StreamProducer.ofIterable(this.input.list);
            if (streamIds.isEmpty()) {
                return listProducer;
            }
            StreamMerger streamMerger = StreamMerger.create(keyFunction, keyComparator, deduplicate);
            DataStreams.bind(listProducer, streamMerger.newInput());
            streamIds.forEach(streamId -> DataStreams.bind(StreamProducerWithResult.ofStage(storage.read((int)streamId)), streamMerger.newInput()));
            return streamMerger.getOutput().withLateBinding();
        });
        this.output = StreamProducer.ofStage(outputStreamStage);
    }

    public static <K, T> StreamSorter<K, T> create(StreamSorterStorage<T> storage, Function<T, K> keyFunction, Comparator<K> keyComparator, boolean deduplicate, int itemsInMemorySize) {
        return new StreamSorter<K, T>(storage, keyFunction, keyComparator, deduplicate, itemsInMemorySize);
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    private final class Input
    extends AbstractStreamConsumer<T>
    implements StreamDataReceiver<T> {
        private ArrayList<T> list = new ArrayList();

        private Input() {
        }

        @Override
        protected void onStarted() {
            this.getProducer().produce(this);
        }

        @Override
        public void onData(T item) {
            this.list.add(item);
            if (this.list.size() >= StreamSorter.this.itemsInMemory) {
                this.list.sort(StreamSorter.this.itemComparator);
                this.writeToTemporaryStorage(this.list).thenRun(this::suspendOrResume);
                this.suspendOrResume();
                this.list = new ArrayList(StreamSorter.this.itemsInMemory);
            }
        }

        private Stage<Integer> writeToTemporaryStorage(List<T> sortedList) {
            return StreamSorter.this.temporaryStreams.addStage(StreamSorter.this.storage.write().thenCompose(consumer -> StreamProducer.ofIterable(sortedList).streamTo(consumer).getConsumerResult()), List::add);
        }

        private void suspendOrResume() {
            if (StreamSorter.this.temporaryStreams.getActiveStages() > 2) {
                this.getProducer().suspend();
            } else {
                this.getProducer().produce(this);
            }
        }

        @Override
        protected void onEndOfStream() {
        }

        @Override
        protected void onError(Throwable t) {
        }
    }
}

