package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/ReadStreamConsumer.class */
public class ReadStreamConsumer<T> implements WriteStream<T> {
    private static final Logger log = LoggerFactory.getLogger(ReadStreamConsumer.class);
    private final Function<T, Future<Void>> writeHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<AsyncResult<Void>> endHandler;
    private boolean readStreamEnded;
    private Promise<Void> promise = Promise.promise();
    AtomicLong queueSize = new AtomicLong();

    public ReadStreamConsumer(Function<T, Future<Void>> function) {
        this.writeHandler = function;
    }

    public WriteStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public Future<Void> write(T t) {
        try {
            this.queueSize.incrementAndGet();
            return this.writeHandler.apply(t).onSuccess(r4 -> {
                itemHandled(null);
            }).onFailure(this::handleException);
        } catch (Exception e) {
            handleException(e);
            return Future.failedFuture(e);
        }
    }

    public void write(T t, Handler<AsyncResult<Void>> handler) {
        try {
            this.queueSize.incrementAndGet();
            this.writeHandler.apply(t).onFailure(this::handleException).onSuccess(r4 -> {
                itemHandled(null);
            }).onComplete(handler);
        } catch (Exception e) {
            handleException(e);
            handler.handle(this.promise.future());
        }
    }

    protected void itemHandled(Throwable th) {
        this.queueSize.decrementAndGet();
        checkEnd(th);
    }

    protected void checkEnd(Throwable th) {
        long j = this.queueSize.get();
        if (th != null) {
            this.promise.tryFail(th);
            if (this.endHandler != null) {
                this.endHandler.handle(this.promise.future());
                return;
            }
            return;
        }
        if (this.readStreamEnded && j == 0) {
            this.promise.tryComplete();
            if (this.endHandler != null) {
                this.endHandler.handle(this.promise.future());
            }
        }
    }

    protected void handleException(Throwable th) {
        log.error("Handling failed", th);
        itemHandled(th);
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.endHandler = handler;
        this.readStreamEnded = true;
        checkEnd(null);
    }

    public WriteStream<T> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return false;
    }

    public WriteStream<T> drainHandler(Handler<Void> handler) {
        return this;
    }

    public static <T> Future<Void> forEach(AsyncReadStream<T> asyncReadStream, Function<T, Future<Void>> function) {
        return asyncReadStream.pipeTo(new ReadStreamConsumer(function));
    }

    public static <T> Future<List<T>> toList(AsyncReadStream<T> asyncReadStream) {
        ArrayList arrayList = new ArrayList();
        return asyncReadStream.pipeTo(new ReadStreamConsumer(obj -> {
            arrayList.add(obj);
            return Future.succeededFuture();
        })).map(arrayList);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m21exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
