package io.datakernel.datastream.processor;

import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamInput;
import io.datakernel.datastream.StreamOutputs;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/* loaded from: input_file:io/datakernel/datastream/processor/StreamSharder.class */
public final class StreamSharder<T> implements StreamInput<T>, StreamOutputs, StreamDataAcceptor<T> {
    private final Sharder<T> sharder;
    private final List<StreamSharder<T>.Output> outputs = new ArrayList();
    private StreamDataAcceptor<T>[] dataAcceptors = new StreamDataAcceptor[0];
    private int suspended = 0;
    private final StreamSharder<T>.InputConsumer input = new InputConsumer();

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamSharder$InputConsumer.class */
    protected final class InputConsumer extends AbstractStreamConsumer<T> {
        protected InputConsumer() {
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            return Promises.all(StreamSharder.this.outputs.stream().map((v0) -> {
                return v0.sendEndOfStream();
            }));
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamSharder.this.outputs.forEach(output -> {
                output.close(th);
            });
        }
    }

    /* loaded from: input_file:io/datakernel/datastream/processor/StreamSharder$Output.class */
    protected final class Output extends AbstractStreamSupplier<T> {
        private final int index;

        Output(int i) {
            this.index = i;
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamSharder.access$108(StreamSharder.this);
            StreamSharder.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<T> streamDataAcceptor) {
            StreamSharder.this.dataAcceptors[this.index] = streamDataAcceptor;
            if (StreamSharder.access$106(StreamSharder.this) == 0) {
                StreamSharder.this.input.getSupplier().resume(StreamSharder.this);
            }
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            StreamSharder.this.input.close(th);
        }
    }

    private StreamSharder(Sharder<T> sharder) {
        this.sharder = sharder;
    }

    public static <T> StreamSharder<T> create(Sharder<T> sharder) {
        return new StreamSharder<>(sharder);
    }

    public StreamSupplier<T> newOutput() {
        StreamSharder<T>.Output output = new Output(this.outputs.size());
        this.dataAcceptors = (StreamDataAcceptor[]) Arrays.copyOf(this.dataAcceptors, this.dataAcceptors.length + 1);
        this.suspended++;
        this.outputs.add(output);
        return output;
    }

    @Override // io.datakernel.datastream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.datastream.StreamOutputs
    public List<? extends StreamSupplier<T>> getOutputs() {
        return this.outputs;
    }

    @Override // io.datakernel.datastream.StreamDataAcceptor
    public void accept(T t) {
        this.dataAcceptors[this.sharder.shard(t)].accept(t);
    }

    static /* synthetic */ int access$108(StreamSharder streamSharder) {
        int i = streamSharder.suspended;
        streamSharder.suspended = i + 1;
        return i;
    }

    static /* synthetic */ int access$106(StreamSharder streamSharder) {
        int i = streamSharder.suspended - 1;
        streamSharder.suspended = i;
        return i;
    }
}
