package io.confluent.parallelconsumer.internal;

import com.google.common.flogger.FluentLogger;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/ExternalEngine.class */
public abstract class ExternalEngine<K, V> extends AbstractParallelEoSStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ExternalEngine.class);
    private static final FluentLogger flog = FluentLogger.forEnclosingClass();

    protected ExternalEngine(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        super(parallelConsumerOptions);
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
    protected int calculateQuantityToRequest() {
        int numberRecordsOutForProcessing = this.wm.getNumberRecordsOutForProcessing();
        int maxConcurrency = getOptions().getMaxConcurrency();
        int max = Math.max(0, maxConcurrency - numberRecordsOutForProcessing);
        flog.at(Level.FINE).atMostEvery(1, TimeUnit.SECONDS).log("Target: %s. Out currently: %s. Will request extra: %s", Integer.valueOf(maxConcurrency), Integer.valueOf(numberRecordsOutForProcessing), Integer.valueOf(max));
        return max;
    }

    @Override // io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
    protected void checkPressure() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
    public ThreadPoolExecutor setupWorkerPool(int i) {
        return super.setupWorkerPool(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
    public void onUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        if (isAsyncFutureWork(list)) {
            log.debug("Reactor creation function success, user's function success");
        } else {
            super.onUserFunctionSuccess(workContainer, list);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor
    public void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        if (isAsyncFutureWork(list)) {
            log.debug("User function success but not adding vertx vertical to mailbox yet");
        } else {
            super.addToMailBoxOnUserFunctionSuccess(workContainer, list);
        }
    }

    protected abstract boolean isAsyncFutureWork(List<?> list);
}
