package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/BrokerPollSystem.class */
public class BrokerPollSystem<K, V> implements OffsetCommitter {
    private final ConsumerManager<K, V> consumerManager;
    private final AbstractParallelEoSStreamProcessor<K, V> pc;
    private Optional<ConsumerOffsetCommitter<K, V>> committer;
    private final WorkManager<K, V> wm;
    private final PCMetrics pcMetrics;
    private Gauge statusGauge;
    private Gauge numPausedPartitionsGauge;
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private static Duration longPollTimeout = Duration.ofMillis(2000);
    private State runState = State.RUNNING;
    private Optional<Future<Boolean>> pollControlThreadFuture = Optional.empty();
    private volatile boolean pausedForThrottling = false;
    private final RateLimiter pauseLimiter = new RateLimiter(1);

    /* renamed from: io.confluent.parallelconsumer.internal.BrokerPollSystem$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/parallelconsumer/internal/BrokerPollSystem$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$confluent$parallelconsumer$ParallelConsumerOptions$CommitMode;
        static final /* synthetic */ int[] $SwitchMap$io$confluent$parallelconsumer$internal$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$State[State.DRAINING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$internal$State[State.CLOSING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$io$confluent$parallelconsumer$ParallelConsumerOptions$CommitMode = new int[ParallelConsumerOptions.CommitMode.values().length];
            try {
                $SwitchMap$io$confluent$parallelconsumer$ParallelConsumerOptions$CommitMode[ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$parallelconsumer$ParallelConsumerOptions$CommitMode[ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public BrokerPollSystem(ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, AbstractParallelEoSStreamProcessor<K, V> abstractParallelEoSStreamProcessor, ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        this.committer = Optional.empty();
        this.wm = workManager;
        this.pc = abstractParallelEoSStreamProcessor;
        this.consumerManager = consumerManager;
        switch (AnonymousClass1.$SwitchMap$io$confluent$parallelconsumer$ParallelConsumerOptions$CommitMode[parallelConsumerOptions.getCommitMode().ordinal()]) {
            case 1:
            case DynamicLoadFactor.DEFAULT_INITIAL_LOADING_FACTOR /* 2 */:
                this.committer = Optional.of(new ConsumerOffsetCommitter(consumerManager, workManager, parallelConsumerOptions));
                break;
        }
        this.pcMetrics = abstractParallelEoSStreamProcessor.getModule().pcMetrics();
        initMetrics();
    }

    private void initMetrics() {
        this.statusGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.PC_POLLER_STATUS, this, brokerPollSystem -> {
            return brokerPollSystem.runState.getValue();
        }, new Tag[0]);
        this.numPausedPartitionsGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUM_PAUSED_PARTITIONS, this.consumerManager, (v0) -> {
            return v0.getPausedPartitionSize();
        }, new Tag[0]);
    }

    public void start(String str) {
        ExecutorService newSingleThreadExecutor;
        try {
            newSingleThreadExecutor = (ExecutorService) InitialContext.doLookup(str);
        } catch (NamingException e) {
            log.debug("Couldn't look up an execution service, falling back to Java SE Thread", e);
            newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        }
        this.pollControlThreadFuture = Optional.of(newSingleThreadExecutor.submit(this::controlLoop));
    }

    public void supervise() {
        if (this.pollControlThreadFuture.isPresent()) {
            Future<Boolean> future = this.pollControlThreadFuture.get();
            if (future.isCancelled() || future.isDone()) {
                try {
                    future.get();
                } catch (Exception e) {
                    throw new InternalRuntimeException("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
                }
            }
        }
    }

    private boolean controlLoop() throws TimeoutException, InterruptedException {
        Thread.currentThread().setName("pc-broker-poll");
        this.pc.getMyId().ifPresent(str -> {
            Thread.currentThread().setName("pc-broker-poll-" + str);
            MDC.put(AbstractParallelEoSStreamProcessor.MDC_INSTANCE_ID, str);
        });
        log.trace("Broker poll control loop start");
        this.committer.ifPresent((v0) -> {
            v0.claim();
        });
        while (this.runState != State.CLOSED) {
            try {
                handlePoll();
                maybeDoCommit();
                switch (AnonymousClass1.$SwitchMap$io$confluent$parallelconsumer$internal$State[this.runState.ordinal()]) {
                    case 1:
                        doPause();
                        break;
                    case DynamicLoadFactor.DEFAULT_INITIAL_LOADING_FACTOR /* 2 */:
                        doClose();
                        break;
                }
            } catch (Exception e) {
                log.error("Unknown error", e);
                throw e;
            }
        }
        log.debug("Broker poller thread finished normally, returning OK (true) to future...");
        return true;
    }

    private void handlePoll() {
        log.trace("Loop: Broker poller: ({})", this.runState);
        if (this.runState == State.RUNNING || this.runState == State.DRAINING) {
            EpochAndRecordsMap<K, V> pollBrokerForRecords = pollBrokerForRecords();
            int count = pollBrokerForRecords.count();
            log.debug("Got {} records in poll result", Integer.valueOf(count));
            if (count > 0) {
                log.trace("Loop: Register work");
                this.pc.registerWork(pollBrokerForRecords);
            }
        }
    }

    private void doClose() {
        log.debug("Doing close...");
        doPause();
        maybeCloseConsumerManager();
        this.runState = State.CLOSED;
    }

    private void maybeCloseConsumerManager() {
        if (isResponsibleForCommits()) {
            log.debug("Closing {}, first closing consumer...", getClass().getSimpleName());
            this.consumerManager.close(AbstractParallelEoSStreamProcessor.DEFAULT_TIMEOUT);
            log.debug("Consumer closed.");
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer.isPresent();
    }

    private EpochAndRecordsMap<K, V> pollBrokerForRecords() {
        checkStateForPausingSubscriptions();
        log.debug("Subscriptions are paused: {}", Boolean.valueOf(this.pausedForThrottling));
        Duration ofMillis = this.runState == State.RUNNING || this.runState == State.DRAINING ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {}, might appear to sleep here if subs are paused, or no data available on broker. Run state: {}", ofMillis, this.runState);
        ConsumerRecords<K, V> poll = this.consumerManager.poll(ofMillis);
        log.debug("Poll completed");
        return new EpochAndRecordsMap<>(poll, this.wm.getPm());
    }

    private void checkStateForPausingSubscriptions() {
        if (this.runState == State.DRAINING) {
            doPause();
        } else {
            managePauseOfSubscription();
        }
    }

    public void drain() {
        this.consumerManager.signalStop();
        if (this.runState != State.DRAINING) {
            log.debug("Signaling poll system to drain, waking up consumer...");
            this.runState = State.DRAINING;
            this.consumerManager.wakeup();
        }
    }

    private void doPauseMaybe() {
        if (this.pausedForThrottling) {
            log.trace("Already paused");
        } else if (this.pauseLimiter.couldPerform()) {
            this.pauseLimiter.performIfNotLimited(() -> {
                doPause();
            });
        } else if (log.isDebugEnabled()) {
            log.debug("Should pause but pause rate limit exceeded {} vs {}.", this.pauseLimiter.getElapsedDuration(), this.pauseLimiter.getRate());
        }
    }

    private void doPause() {
        if (this.pausedForThrottling) {
            log.debug("Already paused, skipping");
            return;
        }
        this.pausedForThrottling = true;
        log.debug("Pausing subs");
        this.consumerManager.pause(this.consumerManager.assignment());
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> future = this.pollControlThreadFuture.get();
            boolean z = true;
            while (z) {
                try {
                    z = false;
                    if (!future.get(AbstractParallelEoSStreamProcessor.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).booleanValue()) {
                        log.warn("Broker poll control thread not closed cleanly.");
                    }
                } catch (InterruptedException e) {
                    log.debug("Interrupted waiting for broker poller thread to finish", e);
                } catch (ExecutionException | TimeoutException e2) {
                    log.error("Execution or timeout exception waiting for broker poller thread to finish", e2);
                    throw e2;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        this.consumerManager.signalStop();
        log.debug("Poller transitioning to closing, waking up consumer");
        this.runState = State.CLOSING;
        this.consumerManager.wakeup();
    }

    private void managePauseOfSubscription() {
        boolean shouldThrottle = shouldThrottle();
        log.trace("Need to throttle: {}", Boolean.valueOf(shouldThrottle));
        if (shouldThrottle) {
            doPauseMaybe();
        } else {
            resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.pausedForThrottling) {
            log.debug("Resuming consumer, waking up");
            this.consumerManager.resume(this.consumerManager.paused());
            this.consumerManager.wakeup();
            this.pausedForThrottling = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    @Override // io.confluent.parallelconsumer.internal.OffsetCommitter
    public void retrieveOffsetsAndCommit() {
        if (this.runState != State.RUNNING && this.runState != State.DRAINING && this.runState != State.CLOSING) {
            throw new IllegalStateException(StringUtils.msg("Can't commit - not running (state: {}", this.runState));
        }
        this.committer.orElseThrow(() -> {
            throw new IllegalStateException("No committer configured");
        }).commit();
    }

    private void maybeDoCommit() throws TimeoutException, InterruptedException {
        if (this.committer.isPresent()) {
            this.committer.get().maybeDoCommit();
        }
    }

    public void wakeupIfPaused() {
        if (this.pausedForThrottling) {
            this.consumerManager.wakeup();
        }
    }

    public void pausePollingAndWorkRegistrationIfRunning() {
        if (this.runState != State.RUNNING) {
            log.info("Skipping transition of broker poll system to state paused. Current state is {}.", this.runState);
        } else {
            log.info("Transitioning broker poll system to state paused.");
            this.runState = State.PAUSED;
        }
    }

    public void resumePollingAndWorkRegistrationIfPaused() {
        if (this.runState != State.PAUSED) {
            log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
        } else {
            log.info("Transitioning broker poll system to state running.");
            this.runState = State.RUNNING;
        }
    }

    public int getPausedPartitionSize() {
        return this.consumerManager.getPausedPartitionSize();
    }

    public boolean isPausedForThrottling() {
        return this.pausedForThrottling;
    }

    public static void setLongPollTimeout(Duration duration) {
        longPollTimeout = duration;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}
