package io.datakernel.stream.processor;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamTransformer_M_1;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.processor.StreamReducers;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.PriorityQueue;

/* loaded from: input_file:io/datakernel/stream/processor/AbstractStreamReducer.class */
public abstract class AbstractStreamReducer<K, O, A> extends AbstractStreamTransformer_M_1<O> implements AbstractStreamReducerMBean {
    public static final int BUFFER_SIZE = 1024;
    private final int bufferSize;
    private AbstractStreamReducer<K, O, A>.InternalConsumer<?> lastInput;
    private K key;
    private A accumulator;
    private final PriorityQueue<InternalConsumer> priorityQueue;
    private int streamsAwaiting;
    private int jmxInputItems;
    private int jmxOnFirst;
    private int jmxOnNext;
    private int jmxOnComplete;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/processor/AbstractStreamReducer$InternalConsumer.class */
    public class InternalConsumer<I> extends AbstractStreamConsumer<I> implements StreamDataReceiver<I> {
        private final int index;
        private final PriorityQueue<InternalConsumer> priorityQueue;
        private final ArrayDeque<I> deque;
        private final Function<I, K> keyFunction;
        private final StreamReducers.Reducer<K, I, O, A> reducer;
        private K headKey;
        private I headItem;
        static final /* synthetic */ boolean $assertionsDisabled;

        private InternalConsumer(Eventloop eventloop, PriorityQueue<InternalConsumer> priorityQueue, Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer) {
            super(eventloop);
            this.index = AbstractStreamReducer.this.inputs.size();
            this.deque = new ArrayDeque<>();
            this.priorityQueue = priorityQueue;
            this.keyFunction = function;
            this.reducer = reducer;
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(I i) {
            if (!$assertionsDisabled && AbstractStreamReducer.this.jmxInputItems == AbstractStreamReducer.access$704(AbstractStreamReducer.this)) {
                throw new AssertionError();
            }
            if (this.headItem == null) {
                this.headItem = i;
                this.headKey = (K) this.keyFunction.apply(this.headItem);
                this.priorityQueue.offer(this);
                AbstractStreamReducer.access$810(AbstractStreamReducer.this);
            } else {
                this.deque.offer(i);
            }
            if (this.deque.size() == AbstractStreamReducer.this.bufferSize && AbstractStreamReducer.this.streamsAwaiting == 0) {
                AbstractStreamReducer.this.produce();
                if (AbstractStreamReducer.this.status != 0) {
                    AbstractStreamReducer.this.suspendAllUpstreams();
                }
            }
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void onEndOfStream() {
            if (this.headItem == null) {
                AbstractStreamReducer.access$810(AbstractStreamReducer.this);
            }
            AbstractStreamReducer.this.produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public void onError(Exception exc) {
            this.upstreamProducer.closeWithError(exc);
            AbstractStreamReducer.this.closeWithError(exc);
        }

        @Override // io.datakernel.stream.StreamConsumer
        public StreamDataReceiver<I> getDataReceiver() {
            return this;
        }

        static {
            $assertionsDisabled = !AbstractStreamReducer.class.desiredAssertionStatus();
        }
    }

    public AbstractStreamReducer(Eventloop eventloop, final Comparator<K> comparator, int i) {
        super(eventloop);
        this.key = null;
        Preconditions.checkArgument(i >= 0, "bufferSize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        this.bufferSize = i;
        this.priorityQueue = new PriorityQueue<>(1, new Comparator<InternalConsumer>() { // from class: io.datakernel.stream.processor.AbstractStreamReducer.1
            @Override // java.util.Comparator
            public int compare(InternalConsumer internalConsumer, InternalConsumer internalConsumer2) {
                int compare = comparator.compare(internalConsumer.headKey, internalConsumer2.headKey);
                return compare != 0 ? compare : internalConsumer.index - internalConsumer2.index;
            }
        });
    }

    public AbstractStreamReducer(Eventloop eventloop, Comparator<K> comparator) {
        this(eventloop, comparator, BUFFER_SIZE);
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onProducerStarted() {
        if (this.inputs.isEmpty()) {
            sendEndOfStream();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void doProduce() {
        AbstractStreamReducer<K, O, A>.InternalConsumer<?> poll;
        while (true) {
            if (this.status != 0 || this.streamsAwaiting != 0 || (poll = this.priorityQueue.poll()) == null) {
                break;
            }
            if (this.key == null || !((InternalConsumer) poll).headKey.equals(this.key)) {
                if (this.lastInput != null) {
                    if (!$assertionsDisabled) {
                        int i = this.jmxOnComplete;
                        int i2 = this.jmxOnComplete + 1;
                        this.jmxOnComplete = i2;
                        if (i == i2) {
                            throw new AssertionError();
                        }
                    }
                    ((InternalConsumer) this.lastInput).reducer.onComplete(this.downstreamDataReceiver, this.key, this.accumulator);
                }
                this.key = (K) ((InternalConsumer) poll).headKey;
                if (!$assertionsDisabled) {
                    int i3 = this.jmxOnFirst;
                    int i4 = this.jmxOnFirst + 1;
                    this.jmxOnFirst = i4;
                    if (i3 == i4) {
                        throw new AssertionError();
                    }
                }
                this.accumulator = (A) ((InternalConsumer) poll).reducer.onFirstItem(this.downstreamDataReceiver, this.key, ((InternalConsumer) poll).headItem);
            } else {
                if (!$assertionsDisabled) {
                    int i5 = this.jmxOnNext;
                    int i6 = this.jmxOnNext + 1;
                    this.jmxOnNext = i6;
                    if (i5 == i6) {
                        throw new AssertionError();
                    }
                }
                this.accumulator = (A) ((InternalConsumer) poll).reducer.onNextItem(this.downstreamDataReceiver, this.key, ((InternalConsumer) poll).headItem, this.accumulator);
            }
            ((InternalConsumer) poll).headItem = ((InternalConsumer) poll).deque.poll();
            this.lastInput = poll;
            if (((InternalConsumer) poll).headItem != null) {
                ((InternalConsumer) poll).headKey = ((InternalConsumer) poll).keyFunction.apply(((InternalConsumer) poll).headItem);
                this.priorityQueue.offer(poll);
            } else if (poll.getUpstreamStatus() < 2) {
                this.streamsAwaiting++;
                break;
            }
        }
        if (this.status == 0) {
            resumeAllUpstreams();
        }
        if (this.status == 0 && this.priorityQueue.isEmpty() && this.streamsAwaiting == 0) {
            if (this.lastInput != null) {
                if (!$assertionsDisabled) {
                    int i7 = this.jmxOnComplete;
                    int i8 = this.jmxOnComplete + 1;
                    this.jmxOnComplete = i8;
                    if (i7 == i8) {
                        throw new AssertionError();
                    }
                }
                ((InternalConsumer) this.lastInput).reducer.onComplete(this.downstreamDataReceiver, this.key, this.accumulator);
                this.lastInput = null;
                this.key = null;
                this.accumulator = null;
            }
            sendEndOfStream();
        }
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    public void onResumed() {
        resumeProduce();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <I> StreamConsumer<I> newInput(Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer) {
        InternalConsumer internalConsumer = new InternalConsumer(this.eventloop, this.priorityQueue, function, reducer);
        addInput(internalConsumer);
        this.streamsAwaiting++;
        return internalConsumer;
    }

    @Override // io.datakernel.stream.processor.AbstractStreamReducerMBean
    public int getInputItems() {
        return this.jmxInputItems;
    }

    @Override // io.datakernel.stream.processor.AbstractStreamReducerMBean
    public int getOnFirst() {
        return this.jmxOnFirst;
    }

    @Override // io.datakernel.stream.processor.AbstractStreamReducerMBean
    public int getOnNext() {
        return this.jmxOnNext;
    }

    @Override // io.datakernel.stream.processor.AbstractStreamReducerMBean
    public int getOnComplete() {
        return this.jmxOnComplete;
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    public String toString() {
        String str = "?";
        String str2 = "?";
        String str3 = "?";
        String str4 = "?";
        if (!$assertionsDisabled) {
            String str5 = "" + this.jmxInputItems;
            str = str5;
            if (str5 == null) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled) {
            String str6 = "" + this.jmxOnNext;
            str2 = str6;
            if (str6 == null) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled) {
            String str7 = "" + this.jmxOnFirst;
            str3 = str7;
            if (str7 == null) {
                throw new AssertionError();
            }
        }
        if (!$assertionsDisabled) {
            String str8 = "" + this.jmxOnComplete;
            str4 = str8;
            if (str8 == null) {
                throw new AssertionError();
            }
        }
        return '{' + super.toString() + " items:" + str + " onNext:" + str2 + " onFirst:" + str3 + " onComplete:" + str4 + '}';
    }

    static /* synthetic */ int access$704(AbstractStreamReducer abstractStreamReducer) {
        int i = abstractStreamReducer.jmxInputItems + 1;
        abstractStreamReducer.jmxInputItems = i;
        return i;
    }

    static /* synthetic */ int access$810(AbstractStreamReducer abstractStreamReducer) {
        int i = abstractStreamReducer.streamsAwaiting;
        abstractStreamReducer.streamsAwaiting = i - 1;
        return i;
    }

    static {
        $assertionsDisabled = !AbstractStreamReducer.class.desiredAssertionStatus();
    }
}
