package io.openmessaging.chaos.driver.kafka;

import io.openmessaging.chaos.common.Message;
import io.openmessaging.chaos.driver.mq.ConsumerCallback;
import io.openmessaging.chaos.driver.mq.MQChaosPushConsumer;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/chaos/driver/kafka/KafkaChaosPushConsumer.class */
public class KafkaChaosPushConsumer implements MQChaosPushConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaChaosPushConsumer.class);
    private final KafkaConsumer<String, byte[]> consumer;
    private final Future<?> consumerTask;
    private volatile boolean closing = false;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public KafkaChaosPushConsumer(KafkaConsumer<String, byte[]> kafkaConsumer, ConsumerCallback consumerCallback) {
        this.consumer = kafkaConsumer;
        this.consumerTask = this.executor.submit(() -> {
            while (!this.closing) {
                try {
                    Iterator it = kafkaConsumer.poll(500L).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        consumerCallback.messageReceived(new Message((String) consumerRecord.key(), (byte[]) consumerRecord.value()));
                    }
                } catch (Exception e) {
                    log.error("exception occur while consuming message", e);
                }
            }
        });
    }

    public void start() {
    }

    public void close() {
        this.closing = true;
        try {
            this.consumerTask.get();
            this.executor.submit(() -> {
                if (this.consumer != null) {
                    this.consumer.close();
                }
            }).get();
        } catch (Exception e) {
            log.error("Close KafkaChaosPushConsumer failed", e);
        }
        this.executor.shutdown();
    }
}
