package net.kut3.messaging.kafka.client;

import ch.qos.logback.classic.Level;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.kut3.messaging.BatchMessageProcessor;
import net.kut3.messaging.Consumer;
import net.kut3.messaging.Message;
import net.kut3.messaging.MessageProcessor;
import net.kut3.messaging.ProcessResult;
import net.kut3.messaging.kafka.Component;
import net.kut3.messaging.kafka.OffsetResetMode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/kut3/messaging/kafka/client/ConsumerImpl.class */
class ConsumerImpl implements Consumer, Component {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerImpl.class);
    private final String name;
    private final boolean isAutoAck;
    private final KafkaConsumer<String, String> consumer;
    private final Collection<String> topics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerImpl(String str, Map<String, Object> map, Collection<String> collection) {
        this.name = str;
        if (null != map.get("enable.auto.commit")) {
            this.isAutoAck = ((Boolean) map.get("enable.auto.commit")).booleanValue();
        } else {
            this.isAutoAck = true;
        }
        LOGGER.info("Consumer '" + this.name + "' - enable.auto.commit=" + this.isAutoAck);
        this.consumer = new KafkaConsumer<>(map);
        this.topics = collection;
        LOGGER.info("Consumer '" + this.name + "' - topics=" + Arrays.toString(collection.toArray()));
    }

    public String name() {
        return this.name;
    }

    public boolean isAutoAck() {
        return this.isAutoAck;
    }

    public void start(MessageProcessor messageProcessor) {
        this.consumer.subscribe(this.topics);
        while (true) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(10000L));
            if (poll.isEmpty()) {
                LOGGER.info("Consumer '" + this.name + "' - No records found");
            } else {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    messageProcessor.process(new KafkaMessage((ConsumerRecord) it.next()));
                }
            }
        }
    }

    public void start(BatchMessageProcessor batchMessageProcessor) {
        this.consumer.subscribe(this.topics);
        while (true) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(10000L));
            if (poll.isEmpty()) {
                LOGGER.info("Consumer '" + this.name + "' - No records found");
            } else {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    batchMessageProcessor.addToBatch(new KafkaMessage((ConsumerRecord) it.next()));
                }
                batchMessageProcessor.processBatch();
            }
        }
    }

    public void close() {
        this.consumer.close();
    }

    public static void main(String[] strArr) throws InterruptedException {
        KafkaClientFactory kafkaClientFactory = new KafkaClientFactory();
        LoggerFactory.getLogger("org.apache.kafka").setLevel(Level.ERROR);
        LoggerFactory.getLogger("dev.Merchant").info("Begin");
        Consumer newConsumer = kafkaClientFactory.newConsumer(new SimpleConsumerProperties("kafka-client-0.3.0-01", "10.1.1.99:9092,10.1.1.99:9093,10.1.1.98:9094", "net.kut3.messaging.kafka.test-group", OffsetResetMode.EARLIEST, Arrays.asList("dev.Merchant")));
        newConsumer.start(new BatchMessageProcessor() { // from class: net.kut3.messaging.kafka.client.ConsumerImpl.1
            public void addToBatch(Message message) {
                System.out.println(message.toString());
            }

            public List<ProcessResult> processBatch() {
                System.out.println("Done batch");
                return new ArrayList();
            }
        });
        newConsumer.close();
    }
}
