/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInputs;
import io.datakernel.stream.HasOutput;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.util.Preconditions;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

public final class StreamJoin<K, L, R, V>
implements HasOutput<V>,
HasInputs {
    private final Comparator<K> keyComparator;
    private final Input<L> left;
    private final Input<R> right;
    private final Output output;
    private final ArrayDeque<L> leftDeque = new ArrayDeque();
    private final ArrayDeque<R> rightDeque = new ArrayDeque();
    private final Function<L, K> leftKeyFunction;
    private final Function<R, K> rightKeyFunction;
    private final Joiner<K, L, R, V> joiner;

    private StreamJoin(Comparator<K> keyComparator, Function<L, K> leftKeyFunction, Function<R, K> rightKeyFunction, Joiner<K, L, R, V> joiner) {
        this.keyComparator = (Comparator)Preconditions.checkNotNull(keyComparator);
        this.joiner = (Joiner)Preconditions.checkNotNull(joiner);
        this.left = new Input<L>(this.leftDeque);
        this.right = new Input<R>(this.rightDeque);
        this.leftKeyFunction = (Function)Preconditions.checkNotNull(leftKeyFunction);
        this.rightKeyFunction = (Function)Preconditions.checkNotNull(rightKeyFunction);
        this.output = new Output();
    }

    public static <K, L, R, V> StreamJoin<K, L, R, V> create(Comparator<K> keyComparator, Function<L, K> leftKeyFunction, Function<R, K> rightKeyFunction, Joiner<K, L, R, V> joiner) {
        return new StreamJoin<K, L, R, V>(keyComparator, leftKeyFunction, rightKeyFunction, joiner);
    }

    public StreamConsumer<L> getLeft() {
        return this.left;
    }

    public StreamConsumer<R> getRight() {
        return this.right;
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return Arrays.asList(this.left, this.right);
    }

    @Override
    public StreamProducer<V> getOutput() {
        return this.output;
    }

    protected final class Output
    extends AbstractStreamProducer<V> {
        protected Output() {
        }

        @Override
        protected void onSuspended() {
            StreamJoin.this.left.getProducer().suspend();
            StreamJoin.this.right.getProducer().suspend();
        }

        @Override
        protected void onError(Throwable t) {
            StreamJoin.this.left.closeWithError(t);
            StreamJoin.this.right.closeWithError(t);
        }

        @Override
        protected void produce() {
            if (this.isReceiverReady() && !StreamJoin.this.leftDeque.isEmpty() && !StreamJoin.this.rightDeque.isEmpty()) {
                Object leftValue = StreamJoin.this.leftDeque.peek();
                Object leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                Object rightValue = StreamJoin.this.rightDeque.peek();
                Object rightKey = StreamJoin.this.rightKeyFunction.apply(rightValue);
                while (true) {
                    int compare;
                    if ((compare = StreamJoin.this.keyComparator.compare(leftKey, rightKey)) < 0) {
                        StreamJoin.this.joiner.onLeftJoin(leftKey, leftValue, this.getCurrentDataReceiver());
                        StreamJoin.this.leftDeque.poll();
                        if (StreamJoin.this.leftDeque.isEmpty()) break;
                        leftValue = StreamJoin.this.leftDeque.peek();
                        leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                        continue;
                    }
                    if (compare > 0) {
                        StreamJoin.this.rightDeque.poll();
                        if (StreamJoin.this.rightDeque.isEmpty()) break;
                        rightValue = StreamJoin.this.rightDeque.peek();
                        rightKey = StreamJoin.this.rightKeyFunction.apply(rightValue);
                        continue;
                    }
                    StreamJoin.this.joiner.onInnerJoin(leftKey, leftValue, rightValue, this.getCurrentDataReceiver());
                    StreamJoin.this.leftDeque.poll();
                    if (StreamJoin.this.leftDeque.isEmpty() || !this.isReceiverReady()) break;
                    leftValue = StreamJoin.this.leftDeque.peek();
                    leftKey = StreamJoin.this.leftKeyFunction.apply(leftValue);
                }
            }
            if (this.isReceiverReady()) {
                if (StreamJoin.this.left.getStatus() == StreamStatus.END_OF_STREAM && StreamJoin.this.right.getStatus() == StreamStatus.END_OF_STREAM) {
                    this.sendEndOfStream();
                } else {
                    StreamJoin.this.left.getProducer().produce(StreamJoin.this.left);
                    StreamJoin.this.right.getProducer().produce(StreamJoin.this.right);
                }
            }
        }
    }

    protected final class Input<I>
    extends AbstractStreamConsumer<I>
    implements StreamDataReceiver<I> {
        private final ArrayDeque<I> deque;

        public Input(ArrayDeque<I> deque) {
            this.deque = deque;
        }

        @Override
        public void onData(I item) {
            this.deque.add(item);
            StreamJoin.this.output.produce();
        }

        @Override
        protected void onStarted() {
            StreamJoin.this.output.produce();
        }

        @Override
        protected void onEndOfStream() {
            StreamJoin.this.output.produce();
        }

        @Override
        protected void onError(Throwable t) {
            StreamJoin.this.output.closeWithError(t);
        }
    }

    public static abstract class ValueJoiner<K, L, R, V>
    implements Joiner<K, L, R, V> {
        public abstract V doInnerJoin(K var1, L var2, R var3);

        public V doLeftJoin(K key, L left) {
            return null;
        }

        @Override
        public final void onInnerJoin(K key, L left, R right, StreamDataReceiver<V> output) {
            V result = this.doInnerJoin(key, left, right);
            if (result != null) {
                output.onData(result);
            }
        }

        @Override
        public final void onLeftJoin(K key, L left, StreamDataReceiver<V> output) {
            V result = this.doLeftJoin(key, left);
            if (result != null) {
                output.onData(result);
            }
        }
    }

    public static abstract class InnerJoiner<K, L, R, V>
    implements Joiner<K, L, R, V> {
        @Override
        public void onLeftJoin(K key, L left, StreamDataReceiver<V> output) {
        }
    }

    public static interface Joiner<K, L, R, V> {
        public void onInnerJoin(K var1, L var2, R var3, StreamDataReceiver<V> var4);

        public void onLeftJoin(K var1, L var2, StreamDataReceiver<V> var3);
    }
}

