package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/BrokerPollSystem.class */
public class BrokerPollSystem<K, V> {
    private final Consumer<K, V> consumer;
    private Optional<Future<Boolean>> pollControlThreadFuture;
    private final ParallelEoSStreamProcessor<K, V> pc;
    private final WorkManager<K, V> wm;
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private static Duration longPollTimeout = Duration.ofMillis(2000);
    public ParallelEoSStreamProcessor.State state = ParallelEoSStreamProcessor.State.running;
    private volatile boolean paused = false;

    public BrokerPollSystem(Consumer<K, V> consumer, WorkManager<K, V> workManager, ParallelEoSStreamProcessor<K, V> parallelEoSStreamProcessor) {
        this.consumer = consumer;
        this.wm = workManager;
        this.pc = parallelEoSStreamProcessor;
    }

    public void start() {
        this.pollControlThreadFuture = Optional.of(Executors.newSingleThreadExecutor().submit(this::controlLoop));
    }

    private boolean controlLoop() {
        Thread.currentThread().setName("broker-poll");
        log.trace("Broker poll control loop start");
        while (this.state != ParallelEoSStreamProcessor.State.closed) {
            log.trace("Loop: Poll broker");
            ConsumerRecords<K, V> pollBrokerForRecords = pollBrokerForRecords();
            if (!pollBrokerForRecords.isEmpty()) {
                log.trace("Loop: Register work");
                this.wm.registerWork(pollBrokerForRecords);
                this.pc.notifyNewWorkRegistered();
            }
            switch (this.state) {
                case draining:
                    doPause();
                    this.state = ParallelEoSStreamProcessor.State.closing;
                    break;
                case closing:
                    if (!pollBrokerForRecords.isEmpty()) {
                        log.info("Subscriptions are paused, but records are still being drained (count: {})", Integer.valueOf(pollBrokerForRecords.count()));
                        break;
                    } else {
                        doClose();
                        break;
                    }
            }
        }
        log.trace("Broker poll thread returning true");
        return true;
    }

    private void doClose() {
        log.debug("Closing {}, first closing consumer...", getClass().getSimpleName());
        this.consumer.close(DrainingCloseable.DEFAULT_TIMEOUT);
        log.debug("Consumer closed.");
        this.state = ParallelEoSStreamProcessor.State.closed;
    }

    private ConsumerRecords<K, V> pollBrokerForRecords() {
        ConsumerRecords<K, V> consumerRecords;
        managePauseOfSubscription();
        Duration ofMillis = this.state == ParallelEoSStreamProcessor.State.running ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if no data available on broker.", Long.valueOf(BackportUtils.toSeconds(ofMillis)));
        try {
            consumerRecords = this.consumer.poll(ofMillis);
            log.debug("Poll completed normally and returned {}...", Integer.valueOf(consumerRecords.count()));
        } catch (WakeupException e) {
            log.warn("Awoken from poll. State? {}", this.state);
            consumerRecords = new ConsumerRecords<>(UniMaps.of());
        }
        return consumerRecords;
    }

    public void drain() {
        if (this.state != ParallelEoSStreamProcessor.State.draining) {
            log.debug("Poll system signaling to drain...");
            this.state = ParallelEoSStreamProcessor.State.draining;
            this.consumer.wakeup();
        }
    }

    private void doPause() {
        if (this.paused) {
            log.trace("Already paused");
            return;
        }
        this.paused = true;
        log.debug("Pausing subs");
        this.consumer.pause(this.consumer.assignment());
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> future = this.pollControlThreadFuture.get();
            boolean z = true;
            while (z) {
                try {
                    z = false;
                    if (!future.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).booleanValue()) {
                        log.warn("Broker poll control thread not closed cleanly.");
                    }
                } catch (InterruptedException e) {
                    log.debug("Interrupted", e);
                } catch (ExecutionException | TimeoutException e2) {
                    log.error("Execution or timeout exception", e2);
                    throw e2;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        this.state = ParallelEoSStreamProcessor.State.closing;
        this.consumer.wakeup();
    }

    private void managePauseOfSubscription() {
        if (shouldThrottle()) {
            doPause();
        } else {
            resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.paused) {
            log.debug("Resuming consumer");
            this.consumer.resume(this.consumer.paused());
            this.consumer.wakeup();
            this.paused = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    public static void setLongPollTimeout(Duration duration) {
        longPollTimeout = duration;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}
