/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.stats;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.stream.stats.StreamStats;

public class StreamStatsForwarder<T>
implements StreamTransformer<T, T> {
    private final Input input;
    private final Output output;
    private final StreamStats stats;

    private StreamStatsForwarder(StreamStats stats) {
        this.stats = stats;
        this.input = new Input();
        this.output = new Output();
    }

    public static <T> StreamStatsForwarder<T> create(StreamStats stats) {
        return new StreamStatsForwarder<T>(stats);
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<T> getOutput() {
        return this.output;
    }

    private class Output
    extends AbstractStreamProducer<T> {
        private Output() {
        }

        @Override
        protected void onProduce(StreamDataReceiver<T> dataReceiver) {
            StreamStatsForwarder.this.stats.onProduce();
            StreamStatsForwarder.this.input.getProducer().produce(StreamStatsForwarder.this.stats.createDataReceiver(dataReceiver));
        }

        @Override
        protected void onSuspended() {
            StreamStatsForwarder.this.stats.onSuspend();
            StreamStatsForwarder.this.input.getProducer().suspend();
        }

        @Override
        protected void onError(Throwable t) {
            StreamStatsForwarder.this.stats.onError(t);
            StreamStatsForwarder.this.input.closeWithError(t);
        }
    }

    private class Input
    extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override
        protected void onStarted() {
            StreamStatsForwarder.this.stats.onStarted();
        }

        @Override
        protected void onEndOfStream() {
            StreamStatsForwarder.this.stats.onEndOfStream();
            StreamStatsForwarder.this.output.sendEndOfStream();
        }

        @Override
        protected void onError(Throwable t) {
            StreamStatsForwarder.this.output.closeWithError(t);
        }
    }
}

