package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.iworkz.common.exception.GenesisException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/AbstractAsyncReadStream.class */
public abstract class AbstractAsyncReadStream<T> implements AsyncReadStream<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractAsyncReadStream.class);
    private Handler<T> targetItemHandler;
    private Handler<T> lastTargetItemHandler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private List<Handler<T>> additionalHandlers;
    protected final Promise<AsyncReadStream<T>> readyPromise = Promise.promise();
    protected boolean failed = false;

    public ReadStream<T> handler(Handler<T> handler) {
        if (this.targetItemHandler != null && handler == null) {
            log.debug("Clear target item handler in stream {}", this);
            if (this.targetItemHandler != null) {
                this.lastTargetItemHandler = this.targetItemHandler;
            }
        }
        this.targetItemHandler = handler;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Handler<T> getTargetItemHandler() {
        return this.targetItemHandler;
    }

    @Override // 
    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public AbstractAsyncReadStream<T> mo16resume() {
        if (this.targetItemHandler == null && !isTargetItemHandlerCleared()) {
            log.error("Illegal state: target item handler needs to be set when stream is resumed");
            handleException(new IllegalStateException("Target item handler is not defined"));
        }
        return this;
    }

    protected boolean isTargetItemHandlerCleared() {
        return this.lastTargetItemHandler != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void invokeTargetItemHandler(T t) {
        Handler<T> targetItemHandler = getTargetItemHandler();
        if (targetItemHandler != null) {
            targetItemHandler.handle(t);
            if (this.additionalHandlers != null) {
                Iterator<Handler<T>> it = this.additionalHandlers.iterator();
                while (it.hasNext()) {
                    it.next().handle(t);
                }
                return;
            }
            return;
        }
        if (this.lastTargetItemHandler == null) {
            log.error("Failed to handle item in stream {}, because handler is null, item = {}", this, t);
            throw new GenesisException("Stream handler is not defined");
        }
        log.warn("Fall back to last known handler in stream = {}, item = {}", this, t);
        this.targetItemHandler = this.lastTargetItemHandler;
        invokeTargetItemHandler(t);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public <E> void handleSourceItem(E e) {
        invokeTargetItemHandler(e);
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AsyncReadStream
    public AsyncReadStream<T> withItem(Handler<T> handler) {
        if (this.additionalHandlers == null) {
            this.additionalHandlers = new ArrayList();
        }
        this.additionalHandlers.add(handler);
        return this;
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AsyncReadStream
    public AsyncReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    public ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public Handler<Void> getEndHandler() {
        return this.endHandler;
    }

    public Handler<Throwable> getExceptionHandler() {
        return this.exceptionHandler;
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AsyncReadStream
    public Future<AsyncReadStream<T>> available() {
        return this.readyPromise.future();
    }

    public void fail(Throwable th) {
        if (this.readyPromise.future().isComplete()) {
            handleException(th);
        }
        this.readyPromise.tryFail(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleException(Throwable th) {
        synchronized (this) {
            this.failed = true;
            if (getExceptionHandler() != null) {
                getExceptionHandler().handle(th);
                onException(th).onFailure(th2 -> {
                    log.error("Failed to handle exception", th2);
                }).eventually(r2 -> {
                    return Future.succeededFuture();
                });
            } else {
                onException(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleEnd(Void r5) {
        synchronized (this) {
            if (!this.failed) {
                try {
                    finishItemHandling().onSuccess(r52 -> {
                        if (getEndHandler() == null) {
                            onEnd();
                        } else {
                            getEndHandler().handle(r5);
                            onEnd().onFailure(th -> {
                                log.error("Failed to handle end", th);
                            }).eventually(r2 -> {
                                return Future.succeededFuture();
                            });
                        }
                    }).onFailure(th -> {
                        handleException(th);
                    });
                } catch (Exception e) {
                    handleException(e);
                }
            }
        }
    }

    protected Future<Void> finishItemHandling() {
        return Future.succeededFuture();
    }

    protected Future<Void> onEnd() {
        return Future.succeededFuture();
    }

    protected Future<Throwable> onException(Throwable th) {
        log.error("Streaming exection occured", th);
        return Future.succeededFuture(th);
    }

    @Override // org.iworkz.genesis.vertx.common.stream.AsyncReadStream
    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream mo15endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m17exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
