package io.datakernel.datastream.stats;

import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamCapability;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.processor.StreamTransformer;
import io.datakernel.promise.Promise;
import java.util.Collections;
import java.util.Set;

/* loaded from: input_file:io/datakernel/datastream/stats/StreamStatsForwarder.class */
public class StreamStatsForwarder<T> implements StreamTransformer<T, T> {
    private final StreamStatsForwarder<T>.Input input = new Input();
    private final StreamStatsForwarder<T>.Output output = new Output();
    private final StreamStats<T> stats;

    /* loaded from: input_file:io/datakernel/datastream/stats/StreamStatsForwarder$Input.class */
    private class Input extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamStatsForwarder.this.stats.onStarted();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            StreamStatsForwarder.this.stats.onEndOfStream();
            return StreamStatsForwarder.this.output.sendEndOfStream();
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamStatsForwarder.this.output.close(th);
        }

        @Override // io.datakernel.datastream.AbstractStreamConsumer, io.datakernel.datastream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            StreamConsumer<T> consumer = StreamStatsForwarder.this.output.getConsumer();
            return consumer != null ? consumer.getCapabilities() : Collections.emptySet();
        }
    }

    /* loaded from: input_file:io/datakernel/datastream/stats/StreamStatsForwarder$Output.class */
    private class Output extends AbstractStreamSupplier<T> {
        private Output() {
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onProduce(StreamDataAcceptor<T> streamDataAcceptor) {
            StreamStatsForwarder.this.stats.onProduce();
            StreamStatsForwarder.this.input.getSupplier().resume(StreamStatsForwarder.this.stats.createDataAcceptor(streamDataAcceptor));
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamStatsForwarder.this.stats.onSuspend();
            StreamStatsForwarder.this.input.getSupplier().suspend();
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier
        protected void onError(Throwable th) {
            StreamStatsForwarder.this.stats.onError(th);
            StreamStatsForwarder.this.input.close(th);
        }

        @Override // io.datakernel.datastream.AbstractStreamSupplier, io.datakernel.datastream.StreamSupplier
        public Set<StreamCapability> getCapabilities() {
            StreamSupplier<T> supplier = StreamStatsForwarder.this.input.getSupplier();
            return supplier != null ? supplier.getCapabilities() : Collections.emptySet();
        }
    }

    private StreamStatsForwarder(StreamStats<T> streamStats) {
        this.stats = streamStats;
    }

    public static <T> StreamStatsForwarder<T> create(StreamStats<T> streamStats) {
        return new StreamStatsForwarder<>(streamStats);
    }

    @Override // io.datakernel.datastream.StreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.datastream.StreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
