/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInputs;
import io.datakernel.stream.HasOutput;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.List;

public final class StreamUnion<T>
implements HasOutput<T>,
HasInputs {
    private final List<Input> inputs = new ArrayList<Input>();
    private final Output output = new Output();

    private StreamUnion() {
    }

    public static <T> StreamUnion<T> create() {
        return new StreamUnion<T>();
    }

    @Override
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    public StreamConsumer<T> newInput() {
        Preconditions.checkState((boolean)this.output.getStatus().isOpen());
        Input input = new Input();
        this.inputs.add(input);
        return input;
    }

    private final class Output
    extends AbstractStreamProducer<T> {
        private Output() {
        }

        @Override
        protected void onSuspended() {
            for (int i = 0; i < StreamUnion.this.inputs.size(); ++i) {
                ((Input)StreamUnion.this.inputs.get(i)).getProducer().suspend();
            }
        }

        @Override
        protected void onProduce(StreamDataReceiver<T> dataReceiver) {
            if (!StreamUnion.this.inputs.isEmpty()) {
                for (int i = 0; i < StreamUnion.this.inputs.size(); ++i) {
                    ((Input)StreamUnion.this.inputs.get(i)).getProducer().produce(dataReceiver);
                }
            } else {
                this.eventloop.post(this::sendEndOfStream);
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamUnion.this.inputs.forEach(input -> input.closeWithError(t));
        }
    }

    private final class Input
    extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override
        protected void onEndOfStream() {
            if (StreamUnion.this.inputs.stream().allMatch(input -> input.getStatus() == StreamStatus.END_OF_STREAM)) {
                StreamUnion.this.output.sendEndOfStream();
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamUnion.this.output.closeWithError(t);
        }
    }
}

