package com.noleme.flow.connect.kafka.generator;

import com.noleme.flow.actor.generator.Generator;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/noleme/flow/connect/kafka/generator/KafkaConsumerGenerator.class */
public class KafkaConsumerGenerator<K, V> implements Generator<V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerGenerator.class);
    private final Consumer<K, V> consumer;
    private final Duration pollTimeout;
    private final Collection<String> topics;
    private final Queue<ConsumerRecord<K, V>> queue;
    private boolean isActive;

    public KafkaConsumerGenerator(Consumer<K, V> consumer, String... strArr) {
        this(consumer, Duration.ofMillis(500L), strArr);
    }

    public KafkaConsumerGenerator(Consumer<K, V> consumer, Duration duration, String... strArr) {
        this(consumer, duration, List.of((Object[]) strArr));
    }

    public KafkaConsumerGenerator(Consumer<K, V> consumer, Duration duration, Collection<String> collection) {
        this.topics = collection;
        this.consumer = consumer;
        this.pollTimeout = duration;
        this.queue = new LinkedList();
        this.isActive = true;
        this.consumer.subscribe(collection);
    }

    public boolean hasNext() {
        return this.isActive;
    }

    public V generate() {
        if (!this.queue.isEmpty()) {
            logger.debug("Polling message from in-memory queue (queue size: {})", Integer.valueOf(this.queue.size()));
            return poll();
        }
        while (this.isActive && this.queue.isEmpty()) {
            Iterator it = this.consumer.poll(this.pollTimeout).iterator();
            while (it.hasNext()) {
                this.queue.add((ConsumerRecord) it.next());
            }
        }
        return poll();
    }

    private V poll() {
        logger.info("Receiving message from kafka topics: {}", this.topics);
        ConsumerRecord<K, V> poll = this.queue.poll();
        if (poll != null) {
            return (V) poll.value();
        }
        return null;
    }

    public synchronized void deactivate() {
        this.isActive = false;
    }
}
