package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.Future;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/AbstractMappingStream.class */
public abstract class AbstractMappingStream<S, T> extends AbstractAsyncReadStream<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMappingStream.class);
    private ReadStream<S> readStream;
    private boolean readStreamHandlerIsSet;

    public void mapStream(ReadStream<S> readStream) {
        if (readStream instanceof AsyncReadStream) {
            AsyncReadStream asyncReadStream = (AsyncReadStream) readStream;
            asyncReadStream.available().map(asyncReadStream2 -> {
                mapStreamHandlers(asyncReadStream);
                this.readyPromise.complete(this);
                return asyncReadStream2;
            }).onFailure(this::fail);
        } else {
            mapStreamHandlers(readStream);
            this.readyPromise.complete(this);
        }
    }

    protected void mapStreamHandlers(ReadStream<S> readStream) {
        if (this.readStream != null) {
            throw new IllegalStateException("Source stream already set");
        }
        if (readStream == null) {
            throw new IllegalArgumentException("Source stream must not be null");
        }
        this.readStream = readStream;
        this.readStream.endHandler(this::handleEnd);
        this.readStream.exceptionHandler(this::handleException);
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public AbstractMappingStream<S, T> m18pause() {
        this.readStream.pause();
        return this;
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AbstractAsyncReadStream
    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public AbstractMappingStream<S, T> mo14resume() {
        checkAndSetReadStreamHandlerIfNotSet();
        super.mo14resume();
        this.readStream.resume();
        return this;
    }

    protected void checkAndSetReadStreamHandlerIfNotSet() {
        if (this.readStreamHandlerIsSet) {
            return;
        }
        synchronized (this) {
            if (!this.readStreamHandlerIsSet) {
                this.readStream.handler(this::invokeHandleItem);
                this.readStreamHandlerIsSet = true;
            }
        }
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public AbstractMappingStream<S, T> m17fetch(long j) {
        this.readStream.fetch(j);
        return this;
    }

    protected void checkStream() {
        if (this.readStream == null) {
            log.error("Mapped stream not available yet");
        }
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AsyncReadStream
    public Future<Void> pipeTo(WriteStream<T> writeStream) {
        return pipeTo((WriteStream) writeStream, true);
    }

    protected void invokeHandleItem(S s) {
        try {
            handleSourceItem(s);
        } catch (Exception e) {
            handleException(e);
        }
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AbstractAsyncReadStream
    protected abstract <E> void handleSourceItem(E e);
}
