package org.redkalex.mq.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.redkale.mq.MessageAgent;
import org.redkale.mq.MessageCoder;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageProcessor;
import org.redkale.mq.MessageRecord;
import org.redkale.util.Traces;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageConsumer.class */
public class KafkaMessageConsumer extends MessageConsumer implements Runnable {
    protected Properties config;
    protected Thread thread;
    protected CompletableFuture<Void> startFuture;
    protected CompletableFuture<Void> closeFuture;
    protected KafkaConsumer<String, MessageRecord> consumer;
    protected boolean reconnecting;
    protected boolean autoCommit;
    protected final Object resumeLock;
    protected final boolean finest;
    protected final boolean finer;
    protected final boolean fine;

    /* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageConsumer$MessageRecordDeserializer.class */
    public static class MessageRecordDeserializer implements Deserializer<MessageRecord> {
        private final MessageCoder<MessageRecord> coder;

        public MessageRecordDeserializer(MessageCoder<MessageRecord> messageCoder) {
            this.coder = messageCoder;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public MessageRecord m20deserialize(String str, byte[] bArr) {
            return (MessageRecord) this.coder.decode(bArr);
        }
    }

    public KafkaMessageConsumer(MessageAgent messageAgent, String[] strArr, String str, MessageProcessor messageProcessor, String str2, Properties properties) {
        super(messageAgent, strArr, str, messageProcessor);
        this.resumeLock = new Object();
        Properties properties2 = new Properties();
        properties2.put("group.id", this.consumerid);
        properties2.put("auto.offset.reset", "latest");
        properties2.put("reconnect.backoff.ms", "1000");
        properties2.put("auto.commit.interval.ms", "1000");
        properties2.put("enable.auto.commit", "true");
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", str2);
        this.autoCommit = "true".equalsIgnoreCase(properties2.getOrDefault("enable.auto.commit", "true").toString());
        this.config = properties2;
        this.finest = this.logger.isLoggable(Level.FINEST);
        this.finer = this.logger.isLoggable(Level.FINER);
        this.fine = this.logger.isLoggable(Level.FINE);
    }

    public void retryConnect() {
        if (this.reconnecting) {
            KafkaConsumer<String, MessageRecord> kafkaConsumer = new KafkaConsumer<>(this.config);
            kafkaConsumer.subscribe(Arrays.asList(this.topics));
            this.consumer = kafkaConsumer;
            synchronized (this.resumeLock) {
                this.resumeLock.notifyAll();
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.consumer = new KafkaConsumer<>(this.config, new StringDeserializer(), new MessageRecordDeserializer(this.messageAgent.getMessageCoder()));
        this.consumer.subscribe(Arrays.asList(this.topics));
        ConsumerRecords consumerRecords = null;
        try {
            consumerRecords = this.consumer.poll(Duration.ofMillis(1L));
        } catch (InvalidTopicException e) {
            this.messageAgent.createTopic(this.topics);
        }
        this.startFuture.complete(null);
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] startuped");
        }
        if (consumerRecords != null) {
            try {
                if (consumerRecords.count() > 0) {
                    if (!this.autoCommit) {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.consumer.commitAsync((map, exc) -> {
                            if (exc != null) {
                                this.logger.log(Level.SEVERE, Arrays.toString(this.topics) + " consumer error: " + map, (Throwable) exc);
                            }
                        });
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (this.finest && currentTimeMillis2 > 100) {
                            this.logger.log(Level.FINEST, MessageProcessor.class.getSimpleName() + " processor async commit in " + currentTimeMillis2 + "ms");
                        }
                    }
                    long currentTimeMillis3 = System.currentTimeMillis();
                    MessageRecord messageRecord = null;
                    int count = consumerRecords.count();
                    try {
                        this.processor.begin(count, currentTimeMillis3);
                        Iterator it = consumerRecords.iterator();
                        while (it.hasNext()) {
                            messageRecord = (MessageRecord) ((ConsumerRecord) it.next()).value();
                            Traces.currTraceid(messageRecord.getTraceid());
                            this.processor.process(messageRecord, (Runnable) null);
                        }
                        this.processor.commit();
                        long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
                        if (this.finest && currentTimeMillis4 > 10) {
                            this.logger.log(Level.FINEST, MessageProcessor.class.getSimpleName() + Arrays.toString(this.topics) + " processor run " + count + " records" + ((count != 1 || messageRecord == null) ? "" : "(seqid=" + messageRecord.getSeqid() + ")") + " in " + currentTimeMillis4 + "ms");
                        }
                    } catch (Throwable th) {
                        this.logger.log(Level.SEVERE, MessageProcessor.class.getSimpleName() + " process " + messageRecord + " error", th);
                    }
                }
            } catch (Throwable th2) {
                if (this.closeFuture != null && !this.closeFuture.isDone()) {
                    this.closeFuture.complete(null);
                    if (this.logger.isLoggable(Level.FINE)) {
                        this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdowned");
                    }
                }
                this.logger.log(Level.SEVERE, MessageConsumer.class.getSimpleName() + "(" + Arrays.toString(this.topics) + ") occur error", th2);
                return;
            }
        }
        while (!this.closed) {
            try {
                ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(10000L));
                if (this.reconnecting) {
                    this.reconnecting = false;
                }
                int count2 = poll.count();
                if (count2 != 0) {
                    if (!this.autoCommit) {
                        long currentTimeMillis5 = System.currentTimeMillis();
                        this.consumer.commitAsync((map2, exc2) -> {
                            if (exc2 != null) {
                                this.logger.log(Level.SEVERE, Arrays.toString(this.topics) + " consumer commitAsync error: " + map2, (Throwable) exc2);
                            }
                        });
                        long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis5;
                        if (this.finest && currentTimeMillis6 > 100) {
                            this.logger.log(Level.FINEST, MessageProcessor.class.getSimpleName() + " processor async commit in " + currentTimeMillis6 + "ms");
                        }
                    }
                    long currentTimeMillis7 = System.currentTimeMillis();
                    MessageRecord messageRecord2 = null;
                    try {
                        this.processor.begin(count2, currentTimeMillis7);
                        Iterator it2 = poll.iterator();
                        while (it2.hasNext()) {
                            messageRecord2 = (MessageRecord) ((ConsumerRecord) it2.next()).value();
                            Traces.currTraceid(messageRecord2.getTraceid());
                            this.processor.process(messageRecord2, (Runnable) null);
                        }
                        this.processor.commit();
                        long currentTimeMillis8 = System.currentTimeMillis() - currentTimeMillis7;
                        if (this.finest && currentTimeMillis8 > 10) {
                            this.logger.log(Level.FINEST, MessageProcessor.class.getSimpleName() + Arrays.toString(this.topics) + " processor run " + count2 + " records" + ((count2 != 1 || messageRecord2 == null) ? "" : "(seqid=" + messageRecord2.getSeqid() + ")") + " in " + currentTimeMillis8 + "ms");
                        }
                    } catch (Throwable th3) {
                        this.logger.log(Level.SEVERE, MessageProcessor.class.getSimpleName() + " process " + messageRecord2 + " error", th3);
                    }
                    long currentTimeMillis9 = System.currentTimeMillis() - currentTimeMillis7;
                    if (currentTimeMillis9 > 1000 && this.fine) {
                        Logger logger = this.logger;
                        logger.log(Level.FINE, "Kafka." + this.processor.getClass().getSimpleName() + ".consumer (mqs.count = " + count2 + ", mqs.costs = " + currentTimeMillis9 + " ms)， msg=" + logger);
                    } else if (currentTimeMillis9 > 100 && this.finer) {
                        Logger logger2 = this.logger;
                        logger2.log(Level.FINER, "Kafka." + this.processor.getClass().getSimpleName() + ".consumer (mq.count = " + count2 + ", mq.costs = " + currentTimeMillis9 + " ms)， msg=" + logger2);
                    } else if (this.finest) {
                        this.logger.log(Level.FINEST, "Kafka." + this.processor.getClass().getSimpleName() + ".consumer (mq.count = " + count2 + ", mq.cost = " + currentTimeMillis9 + " ms)");
                    }
                }
            } catch (Exception e2) {
                this.logger.log(Level.WARNING, getClass().getSimpleName() + " poll error", (Throwable) e2);
                this.consumer.close();
                this.consumer = null;
                if (!this.closed) {
                    this.reconnecting = true;
                    ((KafkaMessageAgent) this.messageAgent).startReconnect();
                    synchronized (this.resumeLock) {
                        this.resumeLock.wait();
                    }
                }
            }
        }
        if (this.consumer != null) {
            this.consumer.close();
        }
        if (this.closeFuture != null) {
            this.closeFuture.complete(null);
            if (this.logger.isLoggable(Level.FINE)) {
                this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdowned");
            }
        }
    }

    public synchronized CompletableFuture<Void> startup() {
        if (this.startFuture != null) {
            return this.startFuture;
        }
        this.thread = new Thread(this);
        this.thread.setName("MQ-" + this.consumerid + "-Thread");
        this.startFuture = new CompletableFuture<>();
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] startuping");
        }
        this.thread.start();
        return this.startFuture;
    }

    public synchronized CompletableFuture<Void> shutdown() {
        if (this.consumer == null) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        this.closeFuture = new CompletableFuture<>();
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, MessageConsumer.class.getSimpleName() + " [" + Arrays.toString(this.topics) + "] shutdownling");
        }
        this.closed = true;
        synchronized (this.resumeLock) {
            this.resumeLock.notifyAll();
        }
        return this.closeFuture;
    }
}
