package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.Future;
import org.iworkz.genesis.vertx.common.stream.aggregator.StreamAggregator;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/AggregatingStream.class */
public class AggregatingStream<S, T> extends AbstractMappingStream<S, T> {
    private final StreamAggregator<S, T> aggregator;

    public AggregatingStream(StreamAggregator<S, T> streamAggregator) {
        this.aggregator = streamAggregator;
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AbstractMappingStream, org.iworkz.genesis.vertx.common.stream.AbstractAsyncReadStream
    protected <E> void handleSourceItem(E e) {
        this.aggregator.setNext(e);
        if (!this.aggregator.isFinished()) {
            this.aggregator.collect();
        } else {
            invokeTargetItemHandler(this.aggregator.create());
            this.aggregator.startNew();
        }
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AbstractAsyncReadStream
    protected Future<Void> finishItemHandling() {
        if (!this.aggregator.isEmpty()) {
            invokeTargetItemHandler(this.aggregator.create());
        }
        return Future.succeededFuture();
    }
}
