/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInputs;
import io.datakernel.stream.HasOutput;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.StreamReducers;
import io.datakernel.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Function;

public abstract class AbstractStreamReducer<K, O, A>
implements HasOutput<O>,
HasInputs {
    public static final int DEFAULT_BUFFER_SIZE = 2000;
    private final List<Input> inputs = new ArrayList<Input>();
    private final Output output = new Output();
    private int bufferSize = 2000;
    private Input<?> lastInput;
    private K key = null;
    private A accumulator;
    private final PriorityQueue<Input> priorityQueue = new PriorityQueue(1, (o1, o2) -> {
        int compare = keyComparator.compare(((Input)o1).headKey, ((Input)o2).headKey);
        if (compare != 0) {
            return compare;
        }
        return ((Input)o1).index - ((Input)o2).index;
    });
    private int streamsAwaiting;
    private int streamsOpen;

    public AbstractStreamReducer(Comparator<K> keyComparator) {
    }

    protected AbstractStreamReducer<K, O, A> withBufferSize(int bufferSize) {
        Preconditions.checkArgument((bufferSize >= 0 ? 1 : 0) != 0, (String)"bufferSize must be positive value, got %s", (Object[])new Object[]{bufferSize});
        this.bufferSize = bufferSize;
        return this;
    }

    protected <I> StreamConsumer<I> newInput(Function<I, K> keyFunction, StreamReducers.Reducer<K, I, O, A> reducer) {
        Input input = new Input(this.inputs.size(), this.priorityQueue, keyFunction, reducer, this.bufferSize);
        this.inputs.add(input);
        ++this.streamsAwaiting;
        ++this.streamsOpen;
        return input;
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override
    public StreamProducer<O> getOutput() {
        return this.output;
    }

    private void produce() {
        Input input;
        StreamDataReceiver dataReceiver = this.output.getCurrentDataReceiver();
        if (dataReceiver == null) {
            return;
        }
        while (this.streamsAwaiting == 0 && (input = this.priorityQueue.poll()) != null) {
            if (this.key != null && input.headKey.equals(this.key)) {
                this.accumulator = input.reducer.onNextItem(dataReceiver, this.key, input.headItem, this.accumulator);
            } else {
                if (this.lastInput != null) {
                    ((Input)this.lastInput).reducer.onComplete(dataReceiver, this.key, this.accumulator);
                }
                this.key = input.headKey;
                this.accumulator = input.reducer.onFirstItem(dataReceiver, this.key, input.headItem);
            }
            input.headItem = input.deque.poll();
            this.lastInput = input;
            if (input.headItem != null) {
                input.headKey = input.keyFunction.apply(input.headItem);
                this.priorityQueue.offer(input);
                continue;
            }
            if (!input.getStatus().isOpen()) continue;
            ++this.streamsAwaiting;
            break;
        }
        for (Input input2 : this.inputs) {
            if (input2.deque.size() > this.bufferSize / 2) continue;
            input2.getProducer().produce(input2);
        }
        if (this.streamsOpen == 0 && this.priorityQueue.isEmpty()) {
            if (this.lastInput != null) {
                ((Input)this.lastInput).reducer.onComplete(dataReceiver, this.key, this.accumulator);
                this.lastInput = null;
                this.key = null;
                this.accumulator = null;
            }
            this.output.sendEndOfStream();
        }
    }

    private final class Output
    extends AbstractStreamProducer<O> {
        private Output() {
        }

        @Override
        protected void onError(Throwable t) {
            AbstractStreamReducer.this.inputs.forEach(input -> input.closeWithError(t));
        }

        @Override
        protected void produce() {
            AbstractStreamReducer.this.produce();
        }
    }

    private final class Input<I>
    extends AbstractStreamConsumer<I>
    implements StreamDataReceiver<I> {
        private I headItem;
        private K headKey;
        private final int index;
        private final PriorityQueue<Input> priorityQueue;
        private final ArrayDeque<I> deque = new ArrayDeque();
        private final int bufferSize;
        private final Function<I, K> keyFunction;
        private final StreamReducers.Reducer<K, I, O, A> reducer;

        private Input(int index, PriorityQueue<Input> priorityQueue, Function<I, K> keyFunction, StreamReducers.Reducer<K, I, O, A> reducer, int bufferSize) {
            this.index = index;
            this.priorityQueue = priorityQueue;
            this.keyFunction = keyFunction;
            this.reducer = reducer;
            this.bufferSize = bufferSize;
        }

        @Override
        protected void onWired() {
            super.onWired();
        }

        @Override
        protected void onStarted() {
            this.getProducer().produce(this);
        }

        @Override
        public void onData(I item) {
            if (this.headItem == null) {
                this.headItem = item;
                this.headKey = this.keyFunction.apply(this.headItem);
                this.priorityQueue.offer(this);
                AbstractStreamReducer.this.streamsAwaiting--;
            } else {
                this.deque.offer(item);
                if (this.deque.size() == this.bufferSize) {
                    this.getProducer().suspend();
                    AbstractStreamReducer.this.produce();
                }
            }
        }

        @Override
        protected void onEndOfStream() {
            AbstractStreamReducer.this.streamsOpen--;
            if (this.headItem == null) {
                AbstractStreamReducer.this.streamsAwaiting--;
            }
            AbstractStreamReducer.this.produce();
        }

        @Override
        protected void onError(Throwable t) {
            AbstractStreamReducer.this.output.closeWithError(t);
        }
    }
}

