/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInput;
import io.datakernel.stream.HasOutputs;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.Sharder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public final class StreamSharder<T>
implements HasInput<T>,
HasOutputs,
StreamDataReceiver<T> {
    private final Sharder<T> sharder;
    private final InputConsumer input;
    private final List<Output> outputs = new ArrayList<Output>();
    private StreamDataReceiver<T>[] dataReceivers = new StreamDataReceiver[0];
    private int suspended = 0;

    private StreamSharder(Sharder<T> sharder) {
        this.sharder = sharder;
        this.input = new InputConsumer();
    }

    public static <T> StreamSharder<T> create(Sharder<T> sharder) {
        return new StreamSharder<T>(sharder);
    }

    public StreamProducer<T> newOutput() {
        Output output = new Output(this.outputs.size());
        this.dataReceivers = Arrays.copyOf(this.dataReceivers, this.dataReceivers.length + 1);
        ++this.suspended;
        this.outputs.add(output);
        return output;
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    public List<? extends StreamProducer<T>> getOutputs() {
        return this.outputs;
    }

    @Override
    public void onData(T item) {
        int shard = this.sharder.shard(item);
        this.dataReceivers[shard].onData(item);
    }

    protected final class Output
    extends AbstractStreamProducer<T> {
        private final int index;

        protected Output(int index) {
            this.index = index;
        }

        @Override
        protected void onSuspended() {
            StreamSharder.this.suspended++;
            StreamSharder.this.input.getProducer().suspend();
        }

        @Override
        protected void onProduce(StreamDataReceiver<T> dataReceiver) {
            ((StreamSharder)StreamSharder.this).dataReceivers[this.index] = dataReceiver;
            if (--StreamSharder.this.suspended == 0) {
                StreamSharder.this.input.getProducer().produce(StreamSharder.this);
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamSharder.this.input.closeWithError(t);
        }
    }

    protected final class InputConsumer
    extends AbstractStreamConsumer<T> {
        protected InputConsumer() {
        }

        @Override
        protected void onEndOfStream() {
            StreamSharder.this.outputs.forEach(AbstractStreamProducer::sendEndOfStream);
        }

        @Override
        protected void onError(Throwable t) {
            StreamSharder.this.outputs.forEach(output -> output.closeWithError(t));
        }
    }
}

