/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Cancellation;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.Receiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.CommittableRecord;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.SeekablePartition;

public class KafkaReceiver<K, V>
implements Receiver<K, V>,
ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger((String)KafkaReceiver.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets"));
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    private final List<Flux<? extends Event<?>>> fluxList = new ArrayList();
    private final List<Cancellation> cancellations = new ArrayList<Cancellation>();
    private final AtomicLong requestsPending = new AtomicLong();
    private final AtomicBoolean needsHeartbeat = new AtomicBoolean();
    private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();
    private final Scheduler eventScheduler;
    private final AtomicBoolean isActive = new AtomicBoolean();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private AckMode ackMode;
    private AtmostOnceOffsets atmostOnceOffsets;
    private EmitterProcessor<Event<?>> eventEmitter;
    private BlockingSink<Event<?>> eventSubmission;
    private EmitterProcessor<ConsumerRecords<K, V>> recordEmitter;
    private BlockingSink<ConsumerRecords<K, V>> recordSubmission;
    private InitEvent initEvent;
    private PollEvent pollEvent;
    private HeartbeatEvent heartbeatEvent;
    private CommitEvent commitEvent;
    private Flux<Event<?>> eventFlux;
    private Flux<ConsumerRecords<K, V>> consumerFlux;
    private Consumer<K, V> consumer;
    private Consumer<K, V> consumerProxy;

    public KafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions.toImmutable();
        this.eventScheduler = Schedulers.newSingle((String)("reactive-kafka-" + receiverOptions.groupId()));
    }

    @Override
    public Flux<ReceiverRecord<K, V>> receive() {
        this.ackMode = AckMode.MANUAL_ACK;
        Flux flux = this.createConsumerFlux().concatMap(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords));
        return this.withDoOnRequest(flux).map(r -> {
            TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
            CommittableOffset committableOffset = new CommittableOffset(topicPartition, r.offset());
            return new CommittableRecord(r, committableOffset);
        });
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
        this.ackMode = AckMode.AUTO_ACK;
        Flux<ConsumerRecords<K, V>> flux = this.withDoOnRequest(this.createConsumerFlux());
        return flux.map(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords).doAfterTerminate(() -> {
            for (ConsumerRecord r : consumerRecords) {
                new CommittableOffset(r).acknowledge();
            }
        }));
    }

    @Override
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
        this.ackMode = AckMode.ATMOST_ONCE;
        this.atmostOnceOffsets = new AtmostOnceOffsets();
        Flux flux = this.createConsumerFlux().concatMap(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords));
        return this.withDoOnRequest(flux).doOnNext(r -> {
            long offset = r.offset();
            TopicPartition partition = new TopicPartition(r.topic(), r.partition());
            long committedOffset = this.atmostOnceOffsets.committedOffset(partition);
            this.atmostOnceOffsets.onDispatch(partition, offset);
            long commitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
            CommittableOffset committable = new CommittableOffset(partition, offset + commitAheadSize);
            if (offset >= committedOffset) {
                committable.commit().block();
            } else if (committedOffset - offset >= commitAheadSize / 2L) {
                committable.commit().subscribe();
            }
        });
    }

    @Override
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create((T monoSink) -> {
            CustomEvent event = new CustomEvent(function, monoSink);
            this.emit(event);
        });
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsAssigned {}", partitions);
        if (!partitions.isEmpty()) {
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onAssign : this.receiverOptions.assignListeners()) {
                onAssign.accept(this.toSeekable(partitions));
            }
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsRevoked {}", partitions);
        if (!partitions.isEmpty()) {
            if (this.ackMode != AckMode.ATMOST_ONCE) {
                this.commitEvent.runIfRequired(true);
            }
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onRevoke : this.receiverOptions.revokeListeners()) {
                onRevoke.accept(this.toSeekable(partitions));
            }
        }
    }

    private Flux<ConsumerRecords<K, V>> createConsumerFlux() {
        if (this.consumerFlux != null) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaFlux");
        }
        java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign = flux -> this.receiverOptions.subscriber(this).accept(this.consumer);
        this.initEvent = new InitEvent(kafkaSubscribeOrAssign);
        this.pollEvent = new PollEvent();
        this.commitEvent = new CommitEvent();
        if (!this.autoHeartbeatEnabledInConsumer()) {
            this.heartbeatEvent = new HeartbeatEvent();
        }
        this.recordEmitter = EmitterProcessor.create();
        this.recordSubmission = this.recordEmitter.connectSink();
        this.consumerFlux = this.recordEmitter.publishOn(Schedulers.parallel()).doOnSubscribe(s -> {
            try {
                this.start();
            }
            catch (Exception e) {
                log.error("Subscription to event flux failed", (Throwable)e);
                throw e;
            }
        }).doOnCancel(() -> this.cancel(true));
        return this.consumerFlux;
    }

    private <T> Flux<T> withDoOnRequest(Flux<T> consumerFlux) {
        return consumerFlux.doOnRequest(r -> {
            if (this.requestsPending.addAndGet(r) > 0L) {
                this.pollEvent.scheduleIfRequired();
            }
        });
    }

    Consumer<K, V> kafkaConsumer() {
        return this.consumer;
    }

    CommittableBatch committableBatch() {
        return this.commitEvent.commitBatch;
    }

    void close() {
        this.cancel(true);
    }

    protected boolean autoHeartbeatEnabledInConsumer() {
        return ProtoUtils.latestVersion((int)ApiKeys.JOIN_GROUP.id) != 0;
    }

    private Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> partitions) {
        ArrayList<ReceiverPartition> seekableList = new ArrayList<ReceiverPartition>(partitions.size());
        for (TopicPartition partition : partitions) {
            seekableList.add(new SeekablePartition(this.consumer, partition));
        }
        return seekableList;
    }

    private void start() {
        log.debug("start");
        if (!this.isActive.compareAndSet(false, true)) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaFlux");
        }
        this.fluxList.clear();
        this.requestsPending.set(0L);
        this.consecutiveCommitFailures.set(0);
        this.eventEmitter = EmitterProcessor.create();
        this.eventSubmission = this.eventEmitter.connectSink();
        this.eventScheduler.start();
        Flux initFlux = Flux.just((Object)this.initEvent);
        this.fluxList.add((Flux<Event<?>>)this.eventEmitter);
        this.fluxList.add(initFlux);
        if (this.heartbeatEvent != null) {
            Flux heartbeatFlux = Flux.interval((Duration)this.receiverOptions.heartbeatInterval()).doOnSubscribe(i -> this.needsHeartbeat.set(true)).map(i -> this.heartbeatEvent);
            this.fluxList.add(heartbeatFlux);
        }
        Duration commitInterval = this.receiverOptions.commitInterval();
        if (!(this.ackMode != AckMode.AUTO_ACK && this.ackMode != AckMode.MANUAL_ACK || commitInterval.isZero())) {
            Flux periodicCommitFlux = Flux.interval((Duration)this.receiverOptions.commitInterval()).map(i -> this.commitEvent);
            this.fluxList.add(periodicCommitFlux);
        }
        this.eventFlux = Flux.merge(this.fluxList).publishOn(this.eventScheduler);
        this.cancellations.add(this.eventFlux.subscribe(event -> this.doEvent((Event<?>)event)));
    }

    private void fail(Throwable e, boolean async) {
        log.error("Consumer flux exception", e);
        this.recordSubmission.error(e);
        this.cancel(async);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancel(boolean async) {
        log.debug("cancel {}", (Object)this.isActive);
        if (this.isActive.compareAndSet(true, false)) {
            boolean isConsumerClosed = this.consumer == null;
            try {
                if (!isConsumerClosed) {
                    this.consumer.wakeup();
                    long closeStartNanos = System.nanoTime();
                    long closeEndNanos = closeStartNanos + this.receiverOptions.closeTimeout().toNanos();
                    CloseEvent closeEvent = new CloseEvent(closeEndNanos);
                    if (async) {
                        this.emit(closeEvent);
                        isConsumerClosed = closeEvent.await();
                    } else {
                        closeEvent.run();
                    }
                }
            }
            catch (Exception e) {
                log.warn("Cancel exception: " + e);
            }
            finally {
                this.fluxList.clear();
                this.eventScheduler.shutdown();
                try {
                    for (Cancellation cancellation : this.cancellations) {
                        cancellation.dispose();
                    }
                }
                finally {
                    int maxRetries = 10;
                    for (int i = 0; i < maxRetries && !isConsumerClosed; ++i) {
                        try {
                            if (this.consumer != null) {
                                this.consumer.close();
                            }
                            isConsumerClosed = true;
                            continue;
                        }
                        catch (Exception e) {
                            if (i != maxRetries - 1) continue;
                            log.warn("Consumer could not be closed", (Throwable)e);
                        }
                    }
                    this.consumerFlux = null;
                    this.consumerProxy = null;
                    this.atmostOnceOffsets = null;
                    this.isClosed.set(true);
                }
            }
        }
    }

    private void doEvent(Event<?> event) {
        log.trace("doEvent {}", (Object)event.eventType);
        try {
            event.run();
        }
        catch (Exception e) {
            this.fail(e, false);
        }
    }

    private void emit(Event<?> event) {
        BlockingSink.Emission emission = this.eventSubmission.emit(event);
        if (emission != BlockingSink.Emission.OK) {
            log.error("Event emission failed: {} {}", (Object)event.eventType, (Object)emission);
        }
    }

    private static class AtmostOnceOffsets {
        private final Map<TopicPartition, Long> committedOffsets = new ConcurrentHashMap<TopicPartition, Long>();
        private final Map<TopicPartition, Long> dispatchedOffsets = new ConcurrentHashMap<TopicPartition, Long>();

        AtmostOnceOffsets() {
        }

        void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                this.committedOffsets.put(entry.getKey(), entry.getValue().offset());
            }
        }

        void onDispatch(TopicPartition topicPartition, long offset) {
            this.dispatchedOffsets.put(topicPartition, offset);
        }

        long committedOffset(TopicPartition topicPartition) {
            Long offset = this.committedOffsets.get(topicPartition);
            return offset == null ? -1L : offset;
        }

        boolean undoCommitAhead(CommittableBatch committableBatch) {
            boolean undoRequired = false;
            for (Map.Entry<TopicPartition, Long> entry : this.committedOffsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                long offsetToCommit = this.dispatchedOffsets.get(entry.getKey()) + 1L;
                if (entry.getValue() <= offsetToCommit) continue;
                committableBatch.updateOffset(topicPartition, offsetToCommit);
                undoRequired = true;
            }
            return undoRequired;
        }
    }

    class CommittableOffset
    implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final AtomicBoolean acknowledged;

        public CommittableOffset(ConsumerRecord<K, V> record) {
            this(new TopicPartition(record.topic(), record.partition()), record.offset());
        }

        public CommittableOffset(TopicPartition topicPartition, long nextOffset) {
            this.topicPartition = topicPartition;
            this.commitOffset = nextOffset;
            this.acknowledged = new AtomicBoolean(false);
        }

        @Override
        public Mono<Void> commit() {
            if (this.maybeUpdateOffset() > 0) {
                return this.scheduleCommit();
            }
            return Mono.empty();
        }

        @Override
        public void acknowledge() {
            int commitBatchSize = KafkaReceiver.this.receiverOptions.commitBatchSize();
            long uncommittedCount = this.maybeUpdateOffset();
            if (commitBatchSize > 0 && uncommittedCount >= (long)commitBatchSize) {
                KafkaReceiver.this.commitEvent.scheduleIfRequired();
            }
        }

        @Override
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            if (this.acknowledged.compareAndSet(false, true)) {
                return KafkaReceiver.this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset);
            }
            return KafkaReceiver.this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(emitter -> {
                KafkaReceiver.this.commitEvent.commitBatch.addCallbackEmitter((MonoSink<Void>)emitter);
                KafkaReceiver.this.commitEvent.scheduleIfRequired();
            });
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }

    private class CloseEvent
    extends Event<ConsumerRecords<K, V>> {
        private final long closeEndTimeNanos;
        private Semaphore semaphore;

        CloseEvent(long closeEndTimeNanos) {
            super(EventType.CLOSE);
            this.semaphore = new Semaphore(0);
            this.closeEndTimeNanos = closeEndTimeNanos;
        }

        @Override
        public void run() {
            try {
                if (KafkaReceiver.this.consumer != null) {
                    Collection<TopicPartition> manualAssignment = KafkaReceiver.this.receiverOptions.assignment();
                    if (manualAssignment != null && !manualAssignment.isEmpty()) {
                        KafkaReceiver.this.onPartitionsRevoked(manualAssignment);
                    }
                    int attempts = 3;
                    for (int i = 0; i < attempts; ++i) {
                        try {
                            boolean forceCommit = true;
                            if (KafkaReceiver.this.ackMode == AckMode.ATMOST_ONCE) {
                                forceCommit = KafkaReceiver.this.atmostOnceOffsets.undoCommitAhead(KafkaReceiver.this.committableBatch());
                            }
                            KafkaReceiver.this.commitEvent.runIfRequired(forceCommit);
                            KafkaReceiver.this.commitEvent.waitFor(this.closeEndTimeNanos);
                            KafkaReceiver.this.consumer.close();
                            break;
                        }
                        catch (WakeupException e) {
                            if (i != attempts - 1) continue;
                            throw e;
                        }
                    }
                }
                this.semaphore.release();
            }
            catch (Exception e) {
                log.error("Unexpected exception during close", (Throwable)e);
                KafkaReceiver.this.fail(e, false);
            }
        }

        boolean await(long timeoutNanos) throws InterruptedException {
            return this.semaphore.tryAcquire(timeoutNanos, TimeUnit.NANOSECONDS);
        }

        boolean await() {
            long remainingNanos;
            boolean closed = false;
            while (!closed && (remainingNanos = this.closeEndTimeNanos - System.nanoTime()) > 0L) {
                try {
                    closed = this.await(remainingNanos);
                }
                catch (InterruptedException interruptedException) {}
            }
            return closed;
        }
    }

    private class CustomEvent<T>
    extends Event<Void> {
        private final Function<Consumer<K, V>, ? extends T> function;
        private MonoSink<T> monoSink;

        CustomEvent(Function<Consumer<K, V>, ? extends T> function, MonoSink<T> monoSink) {
            super(EventType.CUSTOM);
            this.function = function;
            this.monoSink = monoSink;
        }

        @Override
        public void run() {
            if (KafkaReceiver.this.isActive.get()) {
                try {
                    T ret = this.function.apply(this.consumerProxy());
                    this.monoSink.success(ret);
                }
                catch (Throwable e) {
                    this.monoSink.error(e);
                }
            }
        }

        private Consumer<K, V> consumerProxy() {
            if (KafkaReceiver.this.consumerProxy == null) {
                Class[] interfaces = new Class[]{Consumer.class};
                InvocationHandler handler = (proxy, method, args) -> {
                    if (DELEGATE_METHODS.contains(method.getName())) {
                        try {
                            return method.invoke((Object)KafkaReceiver.this.consumer, args);
                        }
                        catch (InvocationTargetException e) {
                            throw e.getCause();
                        }
                    }
                    throw new UnsupportedOperationException("Method is not supported: " + method);
                };
                KafkaReceiver.this.consumerProxy = (Consumer)Proxy.newProxyInstance(Consumer.class.getClassLoader(), interfaces, handler);
            }
            return KafkaReceiver.this.consumerProxy;
        }
    }

    private class HeartbeatEvent
    extends Event<Void> {
        HeartbeatEvent() {
            super(EventType.HEARTBEAT);
        }

        @Override
        public void run() {
            if (KafkaReceiver.this.isActive.get() && KafkaReceiver.this.needsHeartbeat.getAndSet(true)) {
                KafkaReceiver.this.consumer.pause((Collection)KafkaReceiver.this.consumer.assignment());
                KafkaReceiver.this.consumer.poll(0L);
                KafkaReceiver.this.consumer.resume((Collection)KafkaReceiver.this.consumer.assignment());
            }
        }
    }

    class CommitEvent
    extends Event<Map<TopicPartition, OffsetAndMetadata>> {
        private final CommittableBatch commitBatch;
        private final AtomicBoolean isPending;
        private final AtomicInteger inProgress;

        CommitEvent() {
            super(EventType.COMMIT);
            this.isPending = new AtomicBoolean();
            this.inProgress = new AtomicInteger();
            this.commitBatch = new CommittableBatch();
        }

        @Override
        public void run() {
            block6: {
                this.isPending.set(false);
                CommittableBatch.CommitArgs commitArgs = this.commitBatch.getAndClearOffsets();
                try {
                    if (commitArgs == null) break block6;
                    if (!commitArgs.offsets().isEmpty()) {
                        this.inProgress.incrementAndGet();
                        if (KafkaReceiver.this.ackMode != AckMode.ATMOST_ONCE) {
                            KafkaReceiver.this.consumer.commitAsync(commitArgs.offsets(), (offsets, exception) -> {
                                this.inProgress.decrementAndGet();
                                if (exception == null) {
                                    this.handleSuccess(commitArgs, offsets);
                                } else {
                                    this.handleFailure(commitArgs, exception);
                                }
                            });
                            break block6;
                        }
                        try {
                            KafkaReceiver.this.consumer.commitSync(commitArgs.offsets());
                            this.handleSuccess(commitArgs, commitArgs.offsets());
                            KafkaReceiver.this.atmostOnceOffsets.onCommit(commitArgs.offsets());
                        }
                        catch (Exception e) {
                            this.handleFailure(commitArgs, e);
                        }
                        this.inProgress.decrementAndGet();
                        break block6;
                    }
                    this.handleSuccess(commitArgs, commitArgs.offsets());
                }
                catch (Exception e) {
                    log.error("Unexpected exception", (Throwable)e);
                    this.inProgress.decrementAndGet();
                    this.handleFailure(commitArgs, e);
                }
            }
        }

        void runIfRequired(boolean force) {
            if (this.isPending.compareAndSet(true, false) || force) {
                this.run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (!offsets.isEmpty()) {
                KafkaReceiver.this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                for (MonoSink<Void> emitter : commitArgs.callbackEmitters()) {
                    emitter.success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exception) {
            boolean mayRetry;
            log.warn("Commit failed", (Throwable)exception);
            boolean bl = mayRetry = this.isRetriableException(exception) && !KafkaReceiver.this.isClosed.get() && KafkaReceiver.this.consecutiveCommitFailures.incrementAndGet() < KafkaReceiver.this.receiverOptions.maxCommitAttempts();
            if (!mayRetry) {
                List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
                if (callbackEmitters != null && !callbackEmitters.isEmpty()) {
                    this.isPending.set(false);
                    this.commitBatch.restoreOffsets(commitArgs, false);
                    for (MonoSink<Void> emitter : callbackEmitters) {
                        emitter.error((Throwable)exception);
                    }
                } else {
                    KafkaReceiver.this.fail(exception, false);
                }
            } else {
                this.commitBatch.restoreOffsets(commitArgs, true);
                log.warn("Commit failed with exception" + exception + ", retries remaining " + (KafkaReceiver.this.receiverOptions.maxCommitAttempts() - KafkaReceiver.this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
            }
        }

        private void scheduleIfRequired() {
            if (this.isPending.compareAndSet(false, true)) {
                KafkaReceiver.this.emit(this);
            }
        }

        private void waitFor(long endTimeNanos) {
            while (this.inProgress.get() > 0 && endTimeNanos - System.nanoTime() > 0L) {
                KafkaReceiver.this.consumer.poll(1L);
            }
        }

        protected boolean isRetriableException(Exception exception) {
            return exception instanceof RetriableCommitFailedException;
        }
    }

    private class PollEvent
    extends Event<ConsumerRecords<K, V>> {
        private final AtomicBoolean isPending;
        private final long pollTimeoutMs;

        PollEvent() {
            super(EventType.POLL);
            this.isPending = new AtomicBoolean();
            this.pollTimeoutMs = KafkaReceiver.this.receiverOptions.pollTimeout().toMillis();
        }

        @Override
        public void run() {
            block6: {
                KafkaReceiver.this.needsHeartbeat.set(false);
                try {
                    if (KafkaReceiver.this.isActive.get()) {
                        BlockingSink.Emission emission;
                        KafkaReceiver.this.commitEvent.runIfRequired(false);
                        ConsumerRecords records = KafkaReceiver.this.consumer.poll(this.pollTimeoutMs);
                        if (records.count() > 0 && (emission = KafkaReceiver.this.recordSubmission.emit((Object)records)) != BlockingSink.Emission.OK) {
                            log.error("Emission of consumer records failed with error " + emission);
                        }
                        if (KafkaReceiver.this.isActive.get()) {
                            int count;
                            this.isPending.compareAndSet(true, false);
                            int n = count = KafkaReceiver.this.ackMode == AckMode.AUTO_ACK && records.count() > 0 ? 1 : records.count();
                            if (KafkaReceiver.this.requestsPending.addAndGet(0 - count) > 0L) {
                                this.scheduleIfRequired();
                            }
                        }
                    }
                }
                catch (Exception e) {
                    if (!KafkaReceiver.this.isActive.get()) break block6;
                    log.error("Unexpected exception", (Throwable)e);
                    KafkaReceiver.this.fail(e, false);
                }
            }
        }

        void scheduleIfRequired() {
            if (this.isPending.compareAndSet(false, true)) {
                KafkaReceiver.this.emit(this);
            }
        }
    }

    private class InitEvent
    extends Event<ConsumerRecords<K, V>> {
        private final java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign;

        InitEvent(java.util.function.Consumer<Flux<?>> kafkaSubscribeOrAssign) {
            super(EventType.INIT);
            this.kafkaSubscribeOrAssign = kafkaSubscribeOrAssign;
        }

        @Override
        public void run() {
            block2: {
                try {
                    KafkaReceiver.this.isActive.set(true);
                    KafkaReceiver.this.isClosed.set(false);
                    KafkaReceiver.this.consumer = KafkaReceiver.this.consumerFactory.createConsumer(KafkaReceiver.this.receiverOptions);
                    this.kafkaSubscribeOrAssign.accept(KafkaReceiver.this.consumerFlux);
                }
                catch (Exception e) {
                    if (!KafkaReceiver.this.isActive.get()) break block2;
                    log.error("Unexpected exception", (Throwable)e);
                    KafkaReceiver.this.fail(e, false);
                }
            }
        }
    }

    abstract class Event<R>
    implements Runnable {
        protected EventType eventType;

        Event(EventType eventType) {
            this.eventType = eventType;
        }

        public EventType eventType() {
            return this.eventType;
        }
    }

    static enum AckMode {
        AUTO_ACK,
        MANUAL_ACK,
        ATMOST_ONCE;

    }

    static enum EventType {
        INIT,
        POLL,
        HEARTBEAT,
        COMMIT,
        CUSTOM,
        CLOSE;

    }
}

