package io.simplesource.saga.shared.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/shared/kafka/ConsumerRunner.class */
public final class ConsumerRunner<K, V> implements Runnable {
    private final Properties consumerConfig;
    private final Consumer<Boolean> onClose;
    private final BiConsumer<K, V> processor;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final String topicName;
    private Optional<KafkaConsumer<K, V>> consumer = Optional.empty();
    private final Logger logger = LoggerFactory.getLogger(ConsumerRunner.class);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public ConsumerRunner(Properties properties, BiConsumer<K, V> biConsumer, Serde<K> serde, Serde<V> serde2, String str, Consumer<Boolean> consumer) {
        this.consumerConfig = properties;
        this.onClose = consumer;
        this.processor = biConsumer;
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.topicName = str;
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig, this.keySerde.deserializer(), this.valueSerde.deserializer());
        kafkaConsumer.subscribe(Collections.singletonList(this.topicName));
        this.consumer = Optional.of(kafkaConsumer);
        while (!this.closed.get()) {
            try {
                try {
                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        this.processor.accept(consumerRecord.key(), consumerRecord.value());
                    }
                } catch (WakeupException e) {
                    if (!this.closed.get()) {
                        throw e;
                    }
                    this.logger.info("Closing consumer and producer");
                    kafkaConsumer.commitSync();
                    kafkaConsumer.close();
                    this.onClose.accept(true);
                    return;
                }
            } catch (Throwable th) {
                this.logger.info("Closing consumer and producer");
                kafkaConsumer.commitSync();
                kafkaConsumer.close();
                this.onClose.accept(true);
                throw th;
            }
        }
        this.logger.info("Closing consumer and producer");
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        this.onClose.accept(true);
    }

    public void close() {
        this.closed.set(true);
        this.consumer.ifPresent((v0) -> {
            v0.wakeup();
        });
    }
}
