package io.datakernel.datastream.processor;

import io.datakernel.common.Preconditions;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamInputs;
import io.datakernel.datastream.StreamOutput;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.processor.StreamReducers;
import io.datakernel.promise.Promise;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/datakernel/datastream/processor/AbstractStreamReducer.class */
public abstract class AbstractStreamReducer<K, O, A> implements StreamInputs, StreamOutput<O> {
    public static final int DEFAULT_BUFFER_SIZE = 2000;

    @Nullable
    private AbstractStreamReducer<K, O, A>.Input<?> lastInput;

    @Nullable
    private A accumulator;
    private final PriorityQueue<Input> priorityQueue;
    private int streamsAwaiting;
    private int streamsOpen;
    private final List<Input> inputs = new ArrayList();
    private int bufferSize = DEFAULT_BUFFER_SIZE;

    @Nullable
    private K key = null;
    private final AbstractStreamReducer<K, O, A>.Output output = new Output();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/datastream/processor/AbstractStreamReducer$Input.class */
    public final class Input<I> extends AbstractStreamConsumer<I> implements StreamDataAcceptor<I> {
        private I headItem;
        private K headKey;
        private final int index;
        private final PriorityQueue<Input> priorityQueue;
        private final ArrayDeque<I> deque;
        private final int bufferSize;
        private final Function<I, K> keyFunction;
        private final StreamReducers.Reducer<K, I, O, A> reducer;

        private Input(int i, PriorityQueue<Input> priorityQueue, Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer, int i2) {
            this.deque = new ArrayDeque<>();
            this.index = i;
            this.priorityQueue = priorityQueue;
            this.keyFunction = function;
            this.reducer = reducer;
            this.bufferSize = i2;
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onStarted() {
            getSupplier().resume(this);
        }

        @Override // io.datakernel.datastream.StreamDataAcceptor
        public void accept(I i) {
            if (this.headItem == null) {
                this.headItem = i;
                this.headKey = this.keyFunction.apply(this.headItem);
                this.priorityQueue.offer(this);
                AbstractStreamReducer.access$210(AbstractStreamReducer.this);
                return;
            }
            this.deque.offer(i);
            if (this.deque.size() == this.bufferSize) {
                getSupplier().suspend();
                AbstractStreamReducer.this.produce();
            }
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            AbstractStreamReducer.access$410(AbstractStreamReducer.this);
            if (this.headItem == null) {
                AbstractStreamReducer.access$210(AbstractStreamReducer.this);
            }
            AbstractStreamReducer.this.produce();
            return AbstractStreamReducer.this.output.getConsumer().getAcknowledgement();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            AbstractStreamReducer.this.output.close(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/datastream/processor/AbstractStreamReducer$Output.class */
    public final class Output extends AbstractStreamSupplier<O> {
        private Output() {
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            AbstractStreamReducer.this.inputs.forEach(input -> {
                input.close(th);
            });
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void produce(AbstractStreamSupplier<O>.AsyncProduceController asyncProduceController) {
            AbstractStreamReducer.this.produce();
        }
    }

    public AbstractStreamReducer(@NotNull Comparator<K> comparator) {
        this.priorityQueue = new PriorityQueue<>(1, (input, input2) -> {
            int compare = comparator.compare(input.headKey, input2.headKey);
            return compare != 0 ? compare : input.index - input2.index;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStreamReducer<K, O, A> withBufferSize(int i) {
        Preconditions.checkArgument(i >= 0, "bufferSize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        this.bufferSize = i;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I> StreamConsumer<I> newInput(Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer) {
        Input input = new Input(this.inputs.size(), this.priorityQueue, function, reducer, this.bufferSize);
        this.inputs.add(input);
        this.streamsAwaiting++;
        this.streamsOpen++;
        return input;
    }

    @Override // io.datakernel.datastream.StreamInputs
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override // io.datakernel.datastream.StreamOutput
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void produce() {
        AbstractStreamReducer<K, O, A>.Input<?> poll;
        StreamDataAcceptor<O> currentDataAcceptor = this.output.getCurrentDataAcceptor();
        if (currentDataAcceptor == null) {
            return;
        }
        while (true) {
            if (this.streamsAwaiting != 0 || (poll = this.priorityQueue.poll()) == null) {
                break;
            }
            if (this.key == null || !((Input) poll).headKey.equals(this.key)) {
                if (this.lastInput != null) {
                    ((Input) this.lastInput).reducer.onComplete(currentDataAcceptor, this.key, this.accumulator);
                }
                this.key = (K) ((Input) poll).headKey;
                this.accumulator = (A) ((Input) poll).reducer.onFirstItem(currentDataAcceptor, this.key, ((Input) poll).headItem);
            } else {
                this.accumulator = (A) ((Input) poll).reducer.onNextItem(currentDataAcceptor, this.key, ((Input) poll).headItem, this.accumulator);
            }
            ((Input) poll).headItem = ((Input) poll).deque.poll();
            this.lastInput = poll;
            if (((Input) poll).headItem == null) {
                if (!poll.getEndOfStream().isResult()) {
                    this.streamsAwaiting++;
                    break;
                }
            } else {
                ((Input) poll).headKey = ((Input) poll).keyFunction.apply(((Input) poll).headItem);
                this.priorityQueue.offer(poll);
            }
        }
        for (Input input : this.inputs) {
            if (input.deque.size() <= this.bufferSize / 2) {
                input.getSupplier().resume(input);
            }
        }
        if (this.streamsOpen == 0 && this.priorityQueue.isEmpty()) {
            if (this.lastInput != null) {
                ((Input) this.lastInput).reducer.onComplete(currentDataAcceptor, this.key, this.accumulator);
                this.lastInput = null;
                this.key = null;
                this.accumulator = null;
            }
            this.output.sendEndOfStream();
        }
    }

    static /* synthetic */ int access$210(AbstractStreamReducer abstractStreamReducer) {
        int i = abstractStreamReducer.streamsAwaiting;
        abstractStreamReducer.streamsAwaiting = i - 1;
        return i;
    }

    static /* synthetic */ int access$410(AbstractStreamReducer abstractStreamReducer) {
        int i = abstractStreamReducer.streamsOpen;
        abstractStreamReducer.streamsOpen = i - 1;
        return i;
    }
}
