package org.redkalex.mq.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.spi.MessageAgent;
import org.redkale.util.Utility;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageConsumer.class */
public class KafkaMessageConsumer implements Runnable {
    private final KafkaMessageAgent messageAgent;
    private CompletableFuture<Void> startFuture;
    private CompletableFuture<Void> closeFuture;
    private boolean autoCommit;
    private final String group;
    private final Map<String, MessageAgent.MessageConsumerWrapper> topicConsumerMap;
    private final List<String> topics;
    private final MessageAgent.MessageConsumerWrapper regexConsumerWrapper;
    private final String regexTopic;
    private KafkaConsumer<String, byte[]> consumer;
    private Thread thread;
    private boolean closed;
    private final ReentrantLock startCloseLock = new ReentrantLock();
    private final Logger logger = Logger.getLogger(getClass().getSimpleName());

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaMessageConsumer(KafkaMessageAgent kafkaMessageAgent, String str, MessageAgent.MessageConsumerWrapper messageConsumerWrapper, Map<String, MessageAgent.MessageConsumerWrapper> map) {
        this.messageAgent = kafkaMessageAgent;
        this.group = str;
        if (messageConsumerWrapper != null) {
            this.regexConsumerWrapper = messageConsumerWrapper;
            this.regexTopic = messageConsumerWrapper.getRegexTopic();
            this.topicConsumerMap = null;
            this.topics = List.of(this.regexTopic);
            return;
        }
        this.regexConsumerWrapper = null;
        this.regexTopic = null;
        this.topicConsumerMap = map;
        this.topics = new ArrayList(map.keySet());
    }

    @Override // java.lang.Runnable
    public void run() {
        this.consumer = new KafkaConsumer<>(this.messageAgent.createConsumerProperties(this.group), new StringDeserializer(), new ByteArrayDeserializer());
        boolean isNotEmpty = Utility.isNotEmpty(this.regexTopic);
        if (isNotEmpty) {
            this.consumer.subscribe(Pattern.compile(this.regexTopic));
        } else {
            this.consumer.subscribe(this.topics);
        }
        this.startFuture.complete(null);
        MessageAgent.MessageConsumerWrapper messageConsumerWrapper = this.regexConsumerWrapper;
        try {
            try {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                while (!this.closed) {
                    try {
                        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        int count = poll.count();
                        if (count != 0) {
                            if (!this.autoCommit) {
                                long currentTimeMillis = System.currentTimeMillis();
                                this.consumer.commitSync();
                                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                if (currentTimeMillis2 > 100 && this.logger.isLoggable(Level.FINE)) {
                                    this.logger.log(Level.FINE, getClass().getSimpleName() + "(topics=" + this.topics + ") processor async commit in " + currentTimeMillis2 + "ms");
                                }
                            }
                            linkedHashMap.clear();
                            long currentTimeMillis3 = System.currentTimeMillis();
                            if (isNotEmpty) {
                                try {
                                    ArrayList arrayList = new ArrayList();
                                    Iterator it = poll.iterator();
                                    while (it.hasNext()) {
                                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                        arrayList.add(new MessageEvent(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), (String) consumerRecord.key(), (byte[]) consumerRecord.value()));
                                    }
                                    messageConsumerWrapper.onMessage(arrayList);
                                } catch (Throwable th) {
                                    this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(topics=" + this.topics + ") process " + linkedHashMap + " error", th);
                                }
                            } else {
                                Iterator it2 = poll.iterator();
                                while (it2.hasNext()) {
                                    ConsumerRecord consumerRecord2 = (ConsumerRecord) it2.next();
                                    ((List) linkedHashMap.computeIfAbsent(consumerRecord2.topic(), str -> {
                                        return new ArrayList();
                                    })).add(new MessageEvent(consumerRecord2.topic(), Integer.valueOf(consumerRecord2.partition()), (String) consumerRecord2.key(), (byte[]) consumerRecord2.value()));
                                }
                                linkedHashMap.forEach((str2, list) -> {
                                    MessageAgent.MessageConsumerWrapper messageConsumerWrapper2 = this.topicConsumerMap.get(str2);
                                    if (messageConsumerWrapper2 != null) {
                                        messageConsumerWrapper2.onMessage(list);
                                    }
                                });
                            }
                            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
                            if (currentTimeMillis4 > 1000 && this.logger.isLoggable(Level.FINE)) {
                                Logger logger = this.logger;
                                logger.log(Level.FINE, getClass().getSimpleName() + "(topics=" + this.topics + ").consumer (mq.count = " + count + ", mq.cost-slower = " + currentTimeMillis4 + " ms)， msgs=" + logger);
                            } else if (currentTimeMillis4 > 100 && this.logger.isLoggable(Level.FINER)) {
                                Logger logger2 = this.logger;
                                logger2.log(Level.FINER, getClass().getSimpleName() + "(topics=" + this.topics + ").consumer (mq.count = " + count + ", mq.cost-slowly = " + currentTimeMillis4 + " ms)， msgs=" + logger2);
                            } else if (currentTimeMillis4 > 10 && this.logger.isLoggable(Level.FINEST)) {
                                this.logger.log(Level.FINEST, getClass().getSimpleName() + "(topics=" + this.topics + ").consumer (mq.count = " + count + ", mq.cost-normal = " + currentTimeMillis4 + " ms)");
                            }
                            linkedHashMap.clear();
                        }
                    } catch (Exception e) {
                        if (!this.closed) {
                            this.logger.log(Level.WARNING, getClass().getSimpleName() + "(topics=" + this.topics + ") poll error", (Throwable) e);
                        }
                    }
                }
                if (this.consumer != null) {
                    this.consumer.close();
                }
                if (this.closeFuture != null) {
                    this.closeFuture.complete(null);
                }
            } catch (Throwable th2) {
                if (this.closeFuture != null) {
                    this.closeFuture.complete(null);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(topics=" + this.topics + ", group=" + this.group + ") occur error", th3);
            if (this.closeFuture != null) {
                this.closeFuture.complete(null);
            }
        }
    }

    public void start() {
        this.startCloseLock.lock();
        try {
            if (this.startFuture != null) {
                this.startFuture.join();
                return;
            }
            this.thread = new Thread(this);
            this.startFuture = new CompletableFuture<>();
            this.thread.setName(MessageConsumer.class.getSimpleName() + "-[" + this.group + "]-Thread");
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(topics=" + this.topics + ", group=" + this.group + ") starting");
            this.thread.start();
            this.startFuture.join();
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(topics=" + this.topics + ", group=" + this.group + ") started");
        } finally {
            this.startCloseLock.unlock();
        }
    }

    public void stop() {
        this.startCloseLock.lock();
        try {
            if (this.closeFuture != null) {
                this.closeFuture.join();
                return;
            }
            if (this.consumer == null) {
                return;
            }
            if (this.closed) {
                return;
            }
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(topics=" + this.topics + ", group=" + this.group + ") stoping");
            this.closeFuture = new CompletableFuture<>();
            this.closed = true;
            this.thread.interrupt();
            this.closeFuture.join();
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(topics=" + this.topics + ", group=" + this.group + ") stoped");
        } finally {
            this.startCloseLock.unlock();
        }
    }
}
