package org.iworkz.genesis.vertx.common.queue;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iworkz/genesis/vertx/common/queue/AbstractAsyncQueue.class */
public abstract class AbstractAsyncQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractAsyncQueue.class);
    private static final Long DEFAULT_PROCESSING_TIMEOUT = 30000L;
    private final Vertx vertx;
    private final int maxParallelCount;
    private final int maxRetries;
    private final long processingTimeout;
    private final AtomicInteger totalCount;
    private final AtomicInteger finishedCount;
    private final AtomicInteger processingCount;
    private boolean failed;
    private boolean finished;
    private final Promise<Void> promise;
    private final Queue<T> queue;
    private final Map<T, Integer> retryMap;

    protected AbstractAsyncQueue(Vertx vertx, int i, int i2) {
        this(vertx, i, i2, DEFAULT_PROCESSING_TIMEOUT.longValue());
    }

    protected AbstractAsyncQueue(Vertx vertx, int i, int i2, long j) {
        this.totalCount = new AtomicInteger();
        this.finishedCount = new AtomicInteger();
        this.processingCount = new AtomicInteger();
        this.failed = false;
        this.finished = false;
        this.promise = Promise.promise();
        this.queue = new ConcurrentLinkedQueue();
        this.retryMap = new HashMap();
        this.maxParallelCount = i;
        this.maxRetries = i2;
        this.vertx = vertx;
        this.processingTimeout = j;
    }

    public void setFinished() {
        this.finished = true;
        continueProcessing();
    }

    public Future<Void> done() {
        return this.promise.future();
    }

    public void add(T t) {
        if (this.failed) {
            return;
        }
        this.totalCount.incrementAndGet();
        addItem(t);
        continueProcessing();
    }

    protected void addItem(T t) {
        this.queue.add(t);
    }

    protected void handleNextItem(T t) {
        if (t != null) {
            try {
                Promise<Void> promise = Promise.promise();
                long createProcessingTimeoutTimer = createProcessingTimeoutTimer(promise);
                processAsync(t).transform(asyncResult -> {
                    if (createProcessingTimeoutTimer > -1) {
                        this.vertx.cancelTimer(createProcessingTimeoutTimer);
                    }
                    if (asyncResult.succeeded()) {
                        promise.tryComplete((Void) asyncResult.result());
                    } else {
                        promise.tryFail(asyncResult.cause());
                    }
                    return Future.succeededFuture();
                });
                promise.future().onComplete(asyncResult2 -> {
                    itemFinished(t, asyncResult2);
                });
            } catch (Exception e) {
                log.error("Exception occured during async processing: {}", e.getMessage(), e);
                itemFinished(t, Future.failedFuture(e));
            }
        }
    }

    protected long createProcessingTimeoutTimer(Promise<Void> promise) {
        if (this.processingTimeout > 0) {
            return this.vertx.setTimer(this.processingTimeout, l -> {
                promise.tryFail("Item processing timed out");
            });
        }
        return -1L;
    }

    protected T retrieveNextItem() {
        try {
            if (this.failed) {
                return null;
            }
            T poll = this.queue.poll();
            if (poll != null) {
                this.processingCount.incrementAndGet();
            }
            return poll;
        } catch (Exception e) {
            failQueue(e);
            return null;
        }
    }

    protected void itemFinished(T t, AsyncResult<Void> asyncResult) {
        try {
            this.processingCount.decrementAndGet();
            if (!this.failed) {
                if (asyncResult.failed()) {
                    log.error("Item failed", asyncResult.cause());
                    if (failedFinallyAfterRetries(t)) {
                        log.info("Failed to process item, after {} retries", Integer.valueOf(this.maxRetries), asyncResult.cause());
                        failQueue(asyncResult.cause());
                    } else {
                        log.info("Add to queue for retry");
                        addItem(t);
                        continueProcessing();
                    }
                } else {
                    this.finishedCount.incrementAndGet();
                    continueProcessing();
                }
            }
        } catch (Exception e) {
            log.error("Failed to finish async item processing", e);
        }
    }

    protected void continueProcessing() {
        try {
            checkQueue();
        } catch (Exception e) {
            failQueue(e);
        }
    }

    public void checkQueue() {
        if (this.failed) {
            log.debug("Skip queue-check because already failed");
            return;
        }
        int i = this.processingCount.get();
        boolean z = i < this.maxParallelCount;
        if (this.finished && i <= 0 && this.queue.isEmpty()) {
            log.debug("Item finished successfully.");
            this.promise.tryComplete();
        } else if (z) {
            T retrieveNextItem = retrieveNextItem();
            if (retrieveNextItem == null) {
                log.info("Ready for next item and queue is empty: finished={}, processingCount={}, totalCount={}, queueSize={}, finishedCount={}, retryCount={}", new Object[]{Boolean.valueOf(this.finished), Integer.valueOf(i), this.totalCount, Integer.valueOf(this.queue.size()), this.finishedCount, Integer.valueOf(this.retryMap.size())});
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("Start processing {} of {} (currently processing: {})", new Object[]{Integer.valueOf(this.finishedCount.get() + i + 1), Integer.valueOf(this.totalCount.get()), Integer.valueOf(i)});
            }
            handleNextItem(retrieveNextItem);
        }
    }

    protected void failQueue(Throwable th) {
        this.failed = true;
        log.info("Failed to process item, after {} retries", Integer.valueOf(this.maxRetries), th);
        if (this.promise.tryFail(th)) {
            log.error("Queue done with fail", th);
        }
    }

    protected boolean failedFinallyAfterRetries(T t) {
        boolean z;
        synchronized (this.retryMap) {
            Integer num = this.retryMap.get(t);
            if (num == null) {
                num = 0;
            }
            Integer valueOf = Integer.valueOf(num.intValue() + 1);
            this.retryMap.put(t, valueOf);
            z = valueOf.intValue() >= this.maxRetries;
        }
        return z;
    }

    protected abstract Future<Void> processAsync(T t);
}
