package org.iworkz.genesis.vertx.common.stream;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/stream/ReadStreamConsumer.class */
public class ReadStreamConsumer<T> implements WriteStream<T> {
    private static final Logger log = LoggerFactory.getLogger(ReadStreamConsumer.class);
    private final Function<T, Future<Void>> writeHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<AsyncResult<Void>> endHandler;
    private Handler<Void> drainHandler;
    private int maxQueueSize;
    private int drainingStartQueueSize;
    private boolean readStreamEnded;
    private boolean handlingFinished;
    private Promise<Void> promise = Promise.promise();
    private AtomicInteger queueSize = new AtomicInteger();

    public ReadStreamConsumer(Function<T, Future<Void>> function) {
        this.writeHandler = function;
    }

    public WriteStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public Future<Void> write(T t) {
        try {
            if (this.handlingFinished) {
                log.error("Handling already finished {}", this);
            }
            this.queueSize.incrementAndGet();
            return this.writeHandler.apply(t).map(r4 -> {
                itemHandled(null);
                return r4;
            }).onFailure(this::handleException);
        } catch (Exception e) {
            handleException(e);
            return Future.failedFuture(e);
        }
    }

    public void write(T t, Handler<AsyncResult<Void>> handler) {
        try {
            if (this.handlingFinished) {
                log.error("Handling already finished {}", this);
            }
            this.queueSize.incrementAndGet();
            this.writeHandler.apply(t).onFailure(this::handleException).map(r4 -> {
                itemHandled(null);
                return r4;
            }).onComplete(handler);
        } catch (Exception e) {
            handleException(e);
            handler.handle(this.promise.future());
        }
    }

    protected void itemHandled(Throwable th) {
        int decrementAndGet = this.queueSize.decrementAndGet();
        if (this.drainingStartQueueSize > 0 && decrementAndGet <= this.drainingStartQueueSize) {
            this.drainingStartQueueSize = 0;
            if (this.drainHandler != null) {
                this.drainHandler.handle((Object) null);
            }
        }
        checkEnd(th);
    }

    protected void checkEnd(Throwable th) {
        long j = this.queueSize.get();
        if (th != null) {
            this.promise.tryFail(th);
            if (this.endHandler != null) {
                this.endHandler.handle(this.promise.future());
                return;
            }
            return;
        }
        if (this.readStreamEnded && j == 0) {
            this.handlingFinished = true;
            this.promise.tryComplete();
            if (this.endHandler != null) {
                this.endHandler.handle(this.promise.future());
            }
        }
    }

    protected void handleException(Throwable th) {
        log.error("Handling failed", th);
        itemHandled(th);
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.endHandler = handler;
        this.readStreamEnded = true;
        checkEnd(null);
    }

    /* renamed from: setWriteQueueMaxSize, reason: merged with bridge method [inline-methods] */
    public ReadStreamConsumer<T> m22setWriteQueueMaxSize(int i) {
        this.maxQueueSize = i;
        return this;
    }

    public boolean writeQueueFull() {
        if (this.maxQueueSize > 0 && this.drainingStartQueueSize == 0 && queueSizeExceedsLimit()) {
            this.drainingStartQueueSize = Math.max(1, this.maxQueueSize / 2);
        }
        return this.drainingStartQueueSize > 0;
    }

    protected boolean queueSizeExceedsLimit() {
        return this.queueSize.get() >= this.maxQueueSize;
    }

    public ReadStreamConsumer<T> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public static <T> Future<Void> forEach(AsyncReadStream<T> asyncReadStream, Function<T, Future<Void>> function) {
        return asyncReadStream.pipeTo(new ReadStreamConsumer(function));
    }

    public static <T> Future<List<T>> toList(AsyncReadStream<T> asyncReadStream) {
        ArrayList arrayList = new ArrayList();
        return asyncReadStream.pipeTo(new ReadStreamConsumer(obj -> {
            arrayList.add(obj);
            return Future.succeededFuture();
        })).map(arrayList);
    }

    /* renamed from: drainHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m21drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m23exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
