package io.openmessaging.chaos.driver.kafka;

import io.openmessaging.chaos.common.Message;
import io.openmessaging.chaos.driver.mq.MQChaosPullConsumer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/chaos/driver/kafka/KafkaChaosPullConsumer.class */
public class KafkaChaosPullConsumer implements MQChaosPullConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaChaosPullConsumer.class);
    private KafkaConsumer kafkaConsumer;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public KafkaChaosPullConsumer(KafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    public List<Message> dequeue() {
        List<Message> list = null;
        try {
            list = (List) this.executor.submit(new Callable<List<Message>>() { // from class: io.openmessaging.chaos.driver.kafka.KafkaChaosPullConsumer.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public List<Message> call() throws Exception {
                    ConsumerRecords poll = KafkaChaosPullConsumer.this.kafkaConsumer.poll(500L);
                    if (poll.isEmpty()) {
                        return null;
                    }
                    ArrayList arrayList = new ArrayList();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        arrayList.add(new Message((String) consumerRecord.key(), (byte[]) consumerRecord.value()));
                    }
                    return arrayList;
                }
            }).get();
        } catch (Exception e) {
            log.error("dequeue error", e);
        }
        return list;
    }

    public void start() {
    }

    public void close() {
        try {
            this.executor.submit(() -> {
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.close();
                }
            }).get();
        } catch (Exception e) {
            log.error("Close KafkaChaosPullConsumer error", e);
        }
        this.executor.shutdown();
    }
}
