package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.DrainingCloseable;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessor.class */
public class ParallelEoSStreamProcessor<K, V> implements ParallelStreamProcessor<K, V>, ConsumerRebalanceListener, Closeable {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessor.class);
    public static final String MDC_INSTANCE_ID = "pcId";
    private final ParallelConsumerOptions options;
    private static final int KAFKA_DEFAULT_AUTO_COMMIT_FREQUENCY = 5000;
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    private final ThreadPoolExecutor workerPool;
    protected WorkManager<K, V> wm;
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private Thread blockableControlThread;
    private final OffsetCommitter committer;
    private Exception failureReason;
    private int numberOfAssignedPartitions;
    private WallClock clock = new WallClock();
    private Duration timeBetweenCommits = Duration.ofMillis(5000);
    private Instant lastCommit = Instant.now();
    private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
    private final BlockingQueue<WorkContainer<K, V>> workMailBox = new LinkedBlockingQueue();
    private final List<Runnable> controlLoopHooks = new ArrayList();
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
    private final AtomicBoolean commitCommand = new AtomicBoolean(false);
    private final DynamicLoadFactor dynamicExtraLoadFactor = new DynamicLoadFactor();
    private State state = State.unused;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener = Optional.empty();
    private final RateLimiter queueStatsLimiter = new RateLimiter();
    private boolean lastWorkRequestWasFulfilled = false;
    Optional<String> myId = Optional.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessor$State.class */
    public enum State {
        unused,
        running,
        draining,
        closing,
        closed
    }

    public boolean isClosedOrFailed() {
        boolean z = this.state == State.closed;
        boolean z2 = false;
        if (this.controlThreadFuture.isPresent()) {
            z2 = this.controlThreadFuture.get().isDone() || this.controlThreadFuture.get().isCancelled();
        }
        return z || z2;
    }

    public Exception getFailureCause() {
        return this.failureReason;
    }

    public ParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) {
        Objects.requireNonNull(parallelConsumerOptions, "Options must be supplied");
        log.info("Confluent Parallel Consumer initialise... Options: {}", parallelConsumerOptions);
        this.options = parallelConsumerOptions;
        this.options.validate();
        this.consumer = this.options.getConsumer();
        checkGroupIdConfigured(this.consumer);
        checkNotSubscribed(this.consumer);
        checkAutoCommitIsDisabled(this.consumer);
        this.workerPool = setupWorkerPool(parallelConsumerOptions.getMaxConcurrency());
        this.wm = new WorkManager<>(parallelConsumerOptions, this.consumer, this.dynamicExtraLoadFactor);
        ConsumerManager consumerManager = new ConsumerManager(this.consumer);
        this.brokerPollSubsystem = new BrokerPollSystem<>(consumerManager, this.wm, this, parallelConsumerOptions);
        if (!this.options.isProducerSupplied()) {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
            return;
        }
        this.producerManager = Optional.of(new ProducerManager(this.options.getProducer(), consumerManager, this.wm, this.options));
        if (this.options.isUsingTransactionalProducer()) {
            this.committer = this.producerManager.get();
        } else {
            this.committer = this.brokerPollSubsystem;
        }
    }

    private void checkGroupIdConfigured(Consumer<K, V> consumer) {
        try {
            consumer.groupMetadata();
        } catch (RuntimeException e) {
            throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a configured GroupId on your Consumer?", e);
        }
    }

    protected ThreadPoolExecutor setupWorkerPool(int i) {
        ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
        ThreadFactory threadFactory = runnable -> {
            Thread newThread = defaultThreadFactory.newThread(runnable);
            newThread.setName("pc-" + newThread.getName());
            return newThread;
        };
        ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
        return new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory, abortPolicy);
    }

    private void checkNotSubscribed(Consumer<K, V> consumer) {
        if (consumer instanceof MockConsumer) {
            return;
        }
        Set subscription = consumer.subscription();
        Set assignment = consumer.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Collection<String> collection) {
        log.debug("Subscribing to {}", collection);
        this.consumer.subscribe(collection, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", pattern);
        this.consumer.subscribe(pattern, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        log.debug("Subscribing to {}", collection);
        this.usersConsumerRebalanceListener = Optional.of(consumerRebalanceListener);
        this.consumer.subscribe(collection, this);
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        log.debug("Subscribing to {}", pattern);
        this.usersConsumerRebalanceListener = Optional.of(consumerRebalanceListener);
        this.consumer.subscribe(pattern, this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.debug("Partitions revoked {}, state: {}", collection, this.state);
        this.numberOfAssignedPartitions -= collection.size();
        try {
            commitOffsetsThatAreReady();
            this.wm.onPartitionsRevoked(collection);
            this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
                consumerRebalanceListener.onPartitionsRevoked(collection);
            });
        } catch (Exception e) {
            throw new InternalRuntimeError("onPartitionsRevoked event error", e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.numberOfAssignedPartitions += collection.size();
        log.info("Assigned {} total ({} new) partition(s) {}", new Object[]{Integer.valueOf(this.numberOfAssignedPartitions), Integer.valueOf(collection.size()), collection});
        this.wm.onPartitionsAssigned(collection);
        this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
            consumerRebalanceListener.onPartitionsAssigned(collection);
        });
        notifyNewWorkRegistered();
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.numberOfAssignedPartitions -= collection.size();
        this.wm.onPartitionsLost(collection);
        this.usersConsumerRebalanceListener.ifPresent(consumerRebalanceListener -> {
            consumerRebalanceListener.onPartitionsLost(collection);
        });
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        if (consumer instanceof KafkaConsumer) {
            Field declaredField = consumer.getClass().getDeclaredField("coordinator");
            declaredField.setAccessible(true);
            ConsumerCoordinator consumerCoordinator = (ConsumerCoordinator) declaredField.get(consumer);
            if (consumerCoordinator == null) {
                throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");
            }
            Field declaredField2 = consumerCoordinator.getClass().getDeclaredField("autoCommitEnabled");
            declaredField2.setAccessible(true);
            if (((Boolean) declaredField2.get(consumerCoordinator)).booleanValue()) {
                throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library.");
            }
        }
    }

    @Override // io.confluent.parallelconsumer.ParallelConsumer
    public void poll(java.util.function.Consumer<ConsumerRecord<K, V>> consumer) {
        supervisorLoop(consumerRecord -> {
            log.trace("asyncPoll - Consumed a record ({}), executing void function...", Long.valueOf(consumerRecord.offset()));
            UserFunctions.carefullyRun((java.util.function.Consumer<ConsumerRecord>) consumer, consumerRecord);
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of();
        }, obj -> {
            log.trace("Void callback applied.");
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduceMany(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> function, java.util.function.Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> consumer) {
        if (!this.options.isProducerSupplied()) {
            throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
        }
        supervisorLoop(consumerRecord -> {
            List<ProducerRecord<K, V>> list = (List) UserFunctions.carefullyRun((Function<ConsumerRecord, B>) function, consumerRecord);
            if (list.isEmpty()) {
                log.debug("No result returned from function to send.");
            }
            log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", consumerRecord, list);
            ArrayList arrayList = new ArrayList();
            log.trace("Producing {} messages in result...", Integer.valueOf(list.size()));
            for (ProducerRecord<K, V> producerRecord : list) {
                log.trace("Producing {}", producerRecord);
                arrayList.add(new ParallelStreamProcessor.ConsumeProduceResult(consumerRecord, producerRecord, this.producerManager.get().produceMessage(producerRecord)));
            }
            return arrayList;
        }, consumer);
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduceMany(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> function) {
        pollAndProduceMany(function, consumeProduceResult -> {
            log.trace("No-op user callback");
        });
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduce(Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> function) {
        pollAndProduce(function, consumeProduceResult -> {
            log.trace("No-op user callback");
        });
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduce(Function<ConsumerRecord<K, V>, ProducerRecord<K, V>> function, java.util.function.Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> consumer) {
        pollAndProduceMany(consumerRecord -> {
            return UniLists.of((ProducerRecord) function.apply(consumerRecord));
        }, consumer);
    }

    @Override // io.confluent.parallelconsumer.DrainingCloseable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeDontDrainFirst(DrainingCloseable.DEFAULT_TIMEOUT.multipliedBy(2L));
    }

    @Override // io.confluent.parallelconsumer.DrainingCloseable
    public void close(Duration duration, DrainingCloseable.DrainingMode drainingMode) {
        if (this.state == State.closed) {
            log.info("Already closed, checking end state..");
        } else {
            log.info("Signaling to close...");
            switch (drainingMode) {
                case DRAIN:
                    log.info("Will wait for all in flight to complete before");
                    transitionToDraining();
                    break;
                case DONT_DRAIN:
                    log.info("Not waiting for in flight to complete, will transition directly to closing");
                    transitionToClosing();
                    break;
            }
            waitForClose(duration);
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            this.controlThreadFuture.get().get(BackportUtils.toSeconds(duration), TimeUnit.SECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration duration) throws TimeoutException, ExecutionException {
        Future<Boolean> future;
        log.info("Waiting on closed state...");
        while (!this.state.equals(State.closed)) {
            try {
                future = this.controlThreadFuture.get();
                log.debug("Blocking on control future");
            } catch (InterruptedException e) {
                log.trace("Interrupted", e);
            } catch (ExecutionException | TimeoutException e2) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close withing draining.", this.state, e2);
                throw e2;
            }
            if (!future.get(BackportUtils.toSeconds(duration), TimeUnit.SECONDS).booleanValue()) {
                throw new TimeoutException("Timeout waiting for system to close (" + duration + ")");
                break;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration duration) throws TimeoutException, ExecutionException {
        log.debug("Doing closing state: {}...", this.state);
        log.debug("Closing and waiting for broker poll system...");
        this.brokerPollSubsystem.closeAndWait();
        maybeCloseConsumer();
        this.producerManager.ifPresent(producerManager -> {
            producerManager.close(duration);
        });
        log.debug("Shutting down execution pool...");
        List<Runnable> shutdownNow = this.workerPool.shutdownNow();
        if (!shutdownNow.isEmpty()) {
            log.warn("Threads not done count: {}", Integer.valueOf(shutdownNow.size()));
        }
        log.trace("Awaiting worker pool termination...");
        boolean z = true;
        while (z) {
            log.debug("Still interrupted");
            try {
                z = false;
                if (!this.workerPool.awaitTermination(BackportUtils.toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), TimeUnit.SECONDS)) {
                    log.warn("Thread execution pool termination await timeout! Were any processing jobs dead locked or otherwise stuck?");
                    this.workerPool.isShutdown();
                    this.workerPool.isTerminated();
                }
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
                z = true;
            }
        }
        log.debug("Close complete.");
        this.state = State.closed;
    }

    private void maybeCloseConsumer() {
        if (isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    public void waitForProcessedNotCommitted(Duration duration) {
        log.debug("Waiting processed but not committed...");
        Timer timer = Time.SYSTEM.timer(duration);
        while (this.wm.isRecordsAwaitingToBeCommitted()) {
            log.trace("Waiting for no in processing...");
            Thread.sleep(100L);
            timer.update();
            if (timer.isExpired()) {
                throw new TimeoutException("Waiting for no more records in processing");
            }
        }
        log.debug("No longer anything in flight.");
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean areMyThreadsDone = areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", Boolean.valueOf(isRecordsAwaitingProcessing), Boolean.valueOf(areMyThreadsDone));
        return isRecordsAwaitingProcessing || areMyThreadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.draining;
        interruptControlThread();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    protected <R> void supervisorLoop(Function<ConsumerRecord<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) {
        log.info("Control loop starting up...");
        if (this.state != State.unused) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - the consumer cannot be used more than once (current state is {})", this.state));
        }
        this.state = State.running;
        Callable callable = () -> {
            Thread currentThread = Thread.currentThread();
            addInstanceMDC();
            currentThread.setName("pc-control");
            log.trace("Control task scheduled");
            this.blockableControlThread = currentThread;
            while (this.state != State.closed) {
                try {
                    controlLoop(function, consumer);
                } catch (Exception e) {
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), e);
                    doClose(DrainingCloseable.DEFAULT_TIMEOUT);
                    this.failureReason = new RuntimeException("Error from poll control thread: " + e.getMessage(), e);
                    throw this.failureReason;
                }
            }
            log.info("Control loop ending clean (state:{})...", this.state);
            return true;
        };
        this.brokerPollSubsystem.start();
        this.controlThreadFuture = Optional.of(Executors.newSingleThreadExecutor().submit(callable));
    }

    private void addInstanceMDC() {
        this.myId.ifPresent(str -> {
            MDC.put(MDC_INSTANCE_ID, str);
        });
    }

    private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) throws TimeoutException, ExecutionException, InterruptedException {
        handleWork(function, consumer);
        if (this.state == State.running) {
            if ((!this.wm.isSufficientlyLoaded()) & this.brokerPollSubsystem.isPaused()) {
                log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs concurrency: {})", this.wm.getWorkQueuedInMailboxCount(), Integer.valueOf(this.options.getMaxConcurrency()));
                this.brokerPollSubsystem.wakeupIfPaused();
            }
        }
        log.trace("Loop: Process mailbox");
        processWorkCompleteMailBox();
        if (this.state == State.running) {
            log.trace("Loop: Maybe commit");
            commitOffsetsMaybe();
        }
        log.trace("Loop: Running {} loop end plugin(s)", Integer.valueOf(this.controlLoopHooks.size()));
        this.controlLoopHooks.forEach((v0) -> {
            v0.run();
        });
        log.trace("Current state: {}", this.state);
        switch (this.state) {
            case draining:
                drain();
                break;
            case closing:
                doClose(DrainingCloseable.DEFAULT_TIMEOUT);
                break;
        }
        this.brokerPollSubsystem.supervise();
        try {
            Thread.sleep(Duration.ofMillis(1L).toMillis());
        } catch (InterruptedException e) {
            log.trace("Woke up", e);
        }
        log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}", new Object[]{Integer.valueOf(this.wm.getTotalWorkWaitingProcessing()), Integer.valueOf(this.wm.getNumberOfEntriesInPartitionQueues()), Integer.valueOf(this.wm.getNumberRecordsOutForProcessing()), this.state});
    }

    private <R> int handleWork(Function<ConsumerRecord<K, V>, List<R>> function, java.util.function.Consumer<R> consumer) {
        checkPressure();
        int i = 0;
        if (this.state == State.running || this.state == State.draining) {
            int queueTargetLoaded = getQueueTargetLoaded();
            int size = this.workerPool.getQueue().size();
            int i2 = queueTargetLoaded - size;
            log.debug("Loop: Will try to get work - target: {}, current queue size: {}, requesting: {}, loading factor: {}", new Object[]{Integer.valueOf(queueTargetLoaded), Integer.valueOf(size), Integer.valueOf(i2), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
            List<WorkContainer<K, V>> maybeGetWork = this.wm.maybeGetWork(i2);
            i = maybeGetWork.size();
            this.lastWorkRequestWasFulfilled = i >= i2;
            log.trace("Loop: Submit to pool");
            submitWorkToPool(function, consumer, maybeGetWork);
        }
        this.queueStatsLimiter.performIfNotLimited(() -> {
            int workerQueueSize = getWorkerQueueSize();
            log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}", new Object[]{Integer.valueOf(this.workerPool.getActiveCount()), Integer.valueOf(workerQueueSize), Integer.valueOf(workerQueueSize), Integer.valueOf(getPoolQueueTarget()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
        });
        return i;
    }

    private int getQueueTargetLoaded() {
        return getPoolQueueTarget() * this.dynamicExtraLoadFactor.getCurrentFactor();
    }

    private void checkPressure() {
        boolean z = this.wm.getWorkQueuedInMailboxCount().intValue() > this.options.getMaxConcurrency();
        if (log.isTraceEnabled()) {
            log.trace("Queue pressure check: (current size: {}, loaded target: {}, factor: {}) if (isPoolQueueLow() {} && dynamicExtraLoadFactor.isWarmUpPeriodOver() {} && moreWorkInQueuesAvailableThatHaveNotBeenPulled {})", new Object[]{Integer.valueOf(getWorkerQueueSize()), Integer.valueOf(getQueueTargetLoaded()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor()), Boolean.valueOf(isPoolQueueLow()), Boolean.valueOf(this.dynamicExtraLoadFactor.isWarmUpPeriodOver()), Boolean.valueOf(z)});
        }
        if (isPoolQueueLow() && this.dynamicExtraLoadFactor.isWarmUpPeriodOver() && z && this.lastWorkRequestWasFulfilled) {
            if (this.dynamicExtraLoadFactor.maybeStepUp()) {
                log.debug("isPoolQueueLow(): Executor pool queue is not loaded with enough work (queue: {} vs target: {}), stepped up loading factor to {}", new Object[]{Integer.valueOf(getWorkerQueueSize()), Integer.valueOf(getPoolQueueTarget()), Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor())});
            } else if (this.dynamicExtraLoadFactor.isMaxReached()) {
                log.warn("isPoolQueueLow(): Max loading factor steps reached: {}/{}", Integer.valueOf(this.dynamicExtraLoadFactor.getCurrentFactor()), Integer.valueOf(this.dynamicExtraLoadFactor.getMaxFactor()));
            }
        }
    }

    private int getPoolQueueTarget() {
        return this.options.getMaxConcurrency();
    }

    private boolean isPoolQueueLow() {
        int workerQueueSize = getWorkerQueueSize();
        int poolQueueTarget = getPoolQueueTarget();
        boolean z = workerQueueSize <= poolQueueTarget;
        boolean hasWorkInMailboxes = this.wm.hasWorkInMailboxes();
        log.debug("workAmountBelowTarget {} {} vs {} && wm.hasWorkInMailboxes() {};", new Object[]{Boolean.valueOf(z), Integer.valueOf(workerQueueSize), Integer.valueOf(poolQueueTarget), Boolean.valueOf(hasWorkInMailboxes)});
        return z && hasWorkInMailboxes;
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (isRecordsAwaitingProcessing()) {
            log.debug("Records still waiting processing, won't transition to closing.");
        } else {
            transitionToClosing();
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        if (this.state == State.unused) {
            this.state = State.closed;
        } else {
            this.state = State.closing;
        }
        interruptControlThread();
    }

    private void processWorkCompleteMailBox() {
        log.trace("Processing mailbox (might block waiting for results)...");
        HashSet<WorkContainer<K, V>> hashSet = new HashSet();
        Duration timeToNextCommit = getTimeToNextCommit();
        WorkContainer<K, V> workContainer = null;
        try {
            if (this.workMailBox.isEmpty() && this.state.equals(State.running)) {
                if (log.isDebugEnabled()) {
                    log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}", new Object[]{timeToNextCommit, Integer.valueOf(this.workerPool.getActiveCount()), Integer.valueOf(getWorkerQueueSize())});
                }
                this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
                log.trace("Blocking poll {}", timeToNextCommit);
                workContainer = this.workMailBox.poll(timeToNextCommit.toMillis(), TimeUnit.MILLISECONDS);
                log.trace("Blocking poll finish");
                this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
            } else {
                workContainer = this.workMailBox.poll();
            }
        } catch (InterruptedException e) {
            log.debug("Interrupted waiting on work results");
        }
        if (workContainer == null) {
            log.debug("Mailbox results returned null, indicating timeout (which was set as {}) or interruption during a blocking wait for returned work results", timeToNextCommit);
        } else {
            hashSet.add(workContainer);
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", Integer.valueOf(size), Integer.valueOf(hashSet.size()));
        this.workMailBox.drainTo(hashSet, size);
        log.trace("Processing drained work {}...", Integer.valueOf(hashSet.size()));
        for (WorkContainer<K, V> workContainer2 : hashSet) {
            MDC.put("offset", workContainer2.toString());
            this.wm.handleFutureResult(workContainer2);
            MDC.clear();
        }
    }

    private void commitOffsetsMaybe() {
        Duration timeSinceLastCommit = getTimeSinceLastCommit();
        boolean z = BackportUtils.toSeconds(timeSinceLastCommit) >= BackportUtils.toSeconds(this.timeBetweenCommits);
        boolean lingeringOnCommitWouldBeBeneficial = lingeringOnCommitWouldBeBeneficial();
        boolean isCommandedToCommit = isCommandedToCommit();
        if (!(z || !lingeringOnCommitWouldBeBeneficial || isCommandedToCommit)) {
            if (log.isDebugEnabled()) {
                if (this.wm.hasCommittableOffsets()) {
                    log.debug("Have offsets to commit, but not enough time elapsed ({}), waiting for at least {}...", timeSinceLastCommit, this.timeBetweenCommits);
                    return;
                } else {
                    log.trace("Could commit now, but no offsets committable");
                    return;
                }
            }
            return;
        }
        Logger logger = log;
        Object[] objArr = new Object[3];
        objArr[0] = Boolean.valueOf(z);
        objArr[1] = Boolean.valueOf(!lingeringOnCommitWouldBeBeneficial);
        objArr[2] = Boolean.valueOf(isCommandedToCommit);
        logger.debug("commitFrequencyOK {} || !lingerBeneficial {} || commitCommand {}", objArr);
        commitOffsetsThatAreReady();
    }

    private int getWorkerQueueSize() {
        return this.workerPool.getQueue().size();
    }

    private boolean lingeringOnCommitWouldBeBeneficial() {
        boolean workIsWaitingToBeProcessed = this.wm.workIsWaitingToBeProcessed();
        boolean hasWorkInFlight = this.wm.hasWorkInFlight();
        boolean z = !this.workMailBox.isEmpty();
        boolean hasWorkInCommitQueues = this.wm.hasWorkInCommitQueues();
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = Boolean.valueOf(workIsWaitingToBeProcessed);
        objArr[1] = Boolean.valueOf(hasWorkInFlight);
        objArr[2] = Boolean.valueOf(z);
        objArr[3] = Boolean.valueOf(!hasWorkInCommitQueues);
        logger.trace("workIsWaitingToBeCompletedSuccessfully {} || workInFlight {} || workWaitingInMailbox {} || !workWaitingToCommit {};", objArr);
        return workIsWaitingToBeProcessed || hasWorkInFlight || z || !hasWorkInCommitQueues;
    }

    private Duration getTimeToNextCommit() {
        if (this.state == State.running || this.state == State.draining) {
            return getTimeBetweenCommits().minus(getTimeSinceLastCommit());
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", State.running, this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCommit() {
        return Duration.between(this.lastCommit, this.clock.getNow());
    }

    private void commitOffsetsThatAreReady() {
        if (this.wm.isClean()) {
            log.debug("Nothing changed since last commit, skipping");
        } else {
            this.committer.retrieveOffsetsAndCommit();
            this.lastCommit = Instant.now();
        }
    }

    private <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> function, java.util.function.Consumer<R> consumer, List<WorkContainer<K, V>> list) {
        if (list.isEmpty()) {
            return;
        }
        log.debug("New work incoming: {}, Pool stats: {}", Integer.valueOf(list.size()), this.workerPool);
        for (WorkContainer<K, V> workContainer : list) {
            log.trace("Sending work ({}) to pool", workContainer);
            workContainer.setFuture(this.workerPool.submit(() -> {
                addInstanceMDC();
                return userFunctionRunner(function, consumer, workContainer);
            }));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> userFunctionRunner(Function<ConsumerRecord<K, V>, List<R>> function, java.util.function.Consumer<R> consumer, WorkContainer<K, V> workContainer) {
        try {
            MDC.put("offset", workContainer.toString());
            if (this.wm.checkEpochIsStale(workContainer)) {
                log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainer);
                return null;
            }
            log.trace("Pool received: {}", workContainer);
            List<R> apply = function.apply(workContainer.getCr());
            onUserFunctionSuccess(workContainer, apply);
            ArrayList arrayList = new ArrayList();
            for (Object obj : apply) {
                log.trace("Running users call back...");
                consumer.accept(obj);
            }
            log.trace("User function future registered");
            addToMailBoxOnUserFunctionSuccess(workContainer, apply);
            return arrayList;
        } catch (Exception e) {
            log.error("Error processing record", e);
            workContainer.onUserFunctionFailure();
            addToMailbox(workContainer);
            throw e;
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        addToMailbox(workContainer);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        log.trace("User function success");
        workContainer.onUserFunctionSuccess();
    }

    protected void addToMailbox(WorkContainer<K, V> workContainer) {
        log.trace("Adding {} to mailbox...", workContainer);
        this.workMailBox.add(workContainer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyNewWorkRegistered() {
        if (!this.currentlyPollingWorkCompleteMailBox.get()) {
            log.trace("Work box not being polled currently, so thread not blocked, will come around to the bail box in the next looop.");
            return;
        }
        if (!(!((Boolean) this.producerManager.map((v0) -> {
            return v0.isTransactionInProgress();
        }).orElse(false)).booleanValue())) {
            log.trace("Would have interrupted control thread, but TX in progress");
        } else {
            log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
            this.blockableControlThread.interrupt();
        }
    }

    @Override // io.confluent.parallelconsumer.DrainingCloseable
    public int workRemaining() {
        return this.wm.getNumberOfEntriesInPartitionQueues();
    }

    void addLoopEndCallBack(Runnable runnable) {
        this.controlLoopHooks.add(runnable);
    }

    public void setLongPollTimeout(Duration duration) {
        BrokerPollSystem.setLongPollTimeout(duration);
    }

    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        synchronized (this.commitCommand) {
            this.commitCommand.set(true);
        }
    }

    private boolean isCommandedToCommit() {
        boolean z;
        synchronized (this.commitCommand) {
            z = this.commitCommand.get();
            if (z) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
        }
        return z;
    }

    void setClock(WallClock wallClock) {
        this.clock = wallClock;
    }

    public void setTimeBetweenCommits(Duration duration) {
        this.timeBetweenCommits = duration;
    }

    public Duration getTimeBetweenCommits() {
        return this.timeBetweenCommits;
    }

    WorkManager<K, V> getWm() {
        return this.wm;
    }

    public int getNumberOfAssignedPartitions() {
        return this.numberOfAssignedPartitions;
    }

    public void setMyId(Optional<String> optional) {
        this.myId = optional;
    }
}
