package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInput;
import io.datakernel.stream.HasOutputs;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter.class */
public final class StreamSplitter<T> implements HasInput<T>, HasOutputs, StreamDataReceiver<T> {
    private final List<StreamSplitter<T>.Output> outputs = new ArrayList();
    private StreamDataReceiver<T>[] dataReceivers = new StreamDataReceiver[0];
    private int suspended = 0;
    private final StreamSplitter<T>.Input input = new Input();

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter$Input.class */
    protected final class Input extends AbstractStreamConsumer<T> {
        protected Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            Preconditions.checkState(!StreamSplitter.this.outputs.isEmpty(), "Splitter has no outputs");
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamSplitter.this.outputs.forEach((v0) -> {
                v0.sendEndOfStream();
            });
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamSplitter.this.outputs.forEach(output -> {
                output.closeWithError(th);
            });
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSplitter$Output.class */
    protected final class Output extends AbstractStreamProducer<T> {
        private final int index;
        static final /* synthetic */ boolean $assertionsDisabled;

        protected Output(int i) {
            this.index = i;
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onStarted() {
            Preconditions.checkState(StreamSplitter.this.input.getProducer() != null, "Splitter has no input");
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            StreamSplitter.access$208(StreamSplitter.this);
            if (!$assertionsDisabled && StreamSplitter.this.input.getProducer() == null) {
                throw new AssertionError();
            }
            StreamSplitter.this.input.getProducer().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onProduce(StreamDataReceiver<T> streamDataReceiver) {
            StreamSplitter.this.dataReceivers[this.index] = streamDataReceiver;
            if (StreamSplitter.access$206(StreamSplitter.this) == 0) {
                if (!$assertionsDisabled && StreamSplitter.this.input.getProducer() == null) {
                    throw new AssertionError();
                }
                StreamSplitter.this.input.getProducer().produce(StreamSplitter.this);
            }
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onError(Throwable th) {
            StreamSplitter.this.input.closeWithError(th);
        }

        static {
            $assertionsDisabled = !StreamSplitter.class.desiredAssertionStatus();
        }
    }

    private StreamSplitter() {
    }

    public static <T> StreamSplitter<T> create() {
        return new StreamSplitter<>();
    }

    public StreamProducer<T> newOutput() {
        StreamSplitter<T>.Output output = new Output(this.outputs.size());
        this.dataReceivers = (StreamDataReceiver[]) Arrays.copyOf(this.dataReceivers, this.dataReceivers.length + 1);
        this.suspended++;
        this.outputs.add(output);
        return output;
    }

    @Override // io.datakernel.stream.HasInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.HasOutputs
    public List<? extends StreamProducer<T>> getOutputs() {
        return this.outputs;
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(T t) {
        for (int i = 0; i < this.dataReceivers.length; i++) {
            this.dataReceivers[i].onData(t);
        }
    }

    static /* synthetic */ int access$208(StreamSplitter streamSplitter) {
        int i = streamSplitter.suspended;
        streamSplitter.suspended = i + 1;
        return i;
    }

    static /* synthetic */ int access$206(StreamSplitter streamSplitter) {
        int i = streamSplitter.suspended - 1;
        streamSplitter.suspended = i;
        return i;
    }
}
