package org.redkalex.mq.kafka;

import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.redkale.mq.spi.MessageClient;
import org.redkale.mq.spi.MessageClientConsumer;
import org.redkale.mq.spi.MessageCoder;
import org.redkale.mq.spi.MessageRecord;
import org.redkale.util.Traces;

/* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageClientConsumer.class */
public class KafkaMessageClientConsumer extends MessageClientConsumer implements Runnable {
    private Properties config;
    private Thread thread;
    private KafkaConsumer<String, MessageRecord> kafkaConsumer;
    private CompletableFuture<Void> startFuture;
    private CompletableFuture<Void> closeFuture;
    private boolean autoCommit;
    private boolean closed;
    private final ReentrantLock startCloseLock;

    /* loaded from: input_file:org/redkalex/mq/kafka/KafkaMessageClientConsumer$MessageRecordDeserializer.class */
    public static class MessageRecordDeserializer implements Deserializer<MessageRecord> {
        private final MessageCoder<MessageRecord> coder;

        public MessageRecordDeserializer(MessageCoder<MessageRecord> messageCoder) {
            this.coder = messageCoder;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public MessageRecord m44deserialize(String str, byte[] bArr) {
            return (MessageRecord) this.coder.decode(bArr);
        }
    }

    public KafkaMessageClientConsumer(KafkaMessageAgent kafkaMessageAgent, MessageClient messageClient) {
        super(messageClient);
        this.startCloseLock = new ReentrantLock();
        Properties createConsumerProperties = kafkaMessageAgent.createConsumerProperties("redkale-message");
        this.autoCommit = "true".equalsIgnoreCase(createConsumerProperties.getOrDefault("enable.auto.commit", "true").toString());
        this.config = createConsumerProperties;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.kafkaConsumer = new KafkaConsumer<>(this.config, new StringDeserializer(), new MessageRecordDeserializer(this.messageClient.getClientMessageCoder()));
        this.kafkaConsumer.subscribe(getTopics());
        this.startFuture.complete(null);
        try {
            try {
                MessageClient messageClient = this.messageClient;
                while (!this.closed) {
                    try {
                        ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        int count = poll.count();
                        if (count != 0) {
                            boolean isLoggable = this.logger.isLoggable(Level.FINEST);
                            if (!this.autoCommit) {
                                try {
                                    long currentTimeMillis = System.currentTimeMillis();
                                    this.kafkaConsumer.commitSync(Duration.ofMillis(3000L));
                                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                    if (currentTimeMillis2 > 1000 && this.logger.isLoggable(Level.FINE)) {
                                        this.logger.log(Level.FINE, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") commitSync cost-slower = " + currentTimeMillis2 + " ms");
                                    } else if (currentTimeMillis2 > 100 && isLoggable) {
                                        this.logger.log(Level.FINEST, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") commitSync cost-slowly = " + currentTimeMillis2 + "ms");
                                    }
                                } catch (Throwable th) {
                                    this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") commitSync error", th);
                                }
                            }
                            long currentTimeMillis3 = System.currentTimeMillis();
                            MessageRecord messageRecord = null;
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                try {
                                    messageRecord = (MessageRecord) ((ConsumerRecord) it.next()).value();
                                    Traces.computeIfAbsent(messageRecord.getTraceid());
                                    messageClient.process(messageRecord, currentTimeMillis3);
                                    Traces.removeTraceid();
                                } catch (Throwable th2) {
                                    this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") consumer " + messageRecord + " error", th2);
                                }
                            }
                            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
                            if (currentTimeMillis4 > 100 && isLoggable) {
                                this.logger.log(Level.FINEST, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") consumer run " + count + " records" + ((count != 1 || messageRecord == null) ? "" : "(seqid=" + messageRecord.getSeqid() + ")") + " in " + currentTimeMillis4 + "ms");
                            }
                            long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis3;
                            if (currentTimeMillis5 > 1000 && this.logger.isLoggable(Level.FINE)) {
                                Logger logger = this.logger;
                                logger.log(Level.FINE, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ").consumer (mq.count = " + count + ", mq.cost-slower = " + currentTimeMillis5 + " ms)， msg=" + logger);
                            } else if (currentTimeMillis5 > 100 && this.logger.isLoggable(Level.FINER)) {
                                Logger logger2 = this.logger;
                                logger2.log(Level.FINER, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ").consumer (mq.count = " + count + ", mq.cost-slowly = " + currentTimeMillis5 + " ms)， msg=" + logger2);
                            } else if (currentTimeMillis5 > 10 && isLoggable) {
                                this.logger.log(Level.FINEST, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ").consumer (mq.count = " + count + ", mq.cost-normal = " + currentTimeMillis5 + " ms)");
                            }
                        }
                    } catch (Throwable th3) {
                        if (!this.closed) {
                            this.logger.log(Level.WARNING, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") poll error", th3);
                        }
                    }
                }
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.close();
                }
                if (this.closeFuture != null) {
                    this.closeFuture.complete(null);
                }
            } catch (Throwable th4) {
                if (!this.closed) {
                    this.logger.log(Level.SEVERE, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") occur error", th4);
                }
                if (this.closeFuture != null) {
                    this.closeFuture.complete(null);
                }
            }
        } catch (Throwable th5) {
            if (this.closeFuture != null) {
                this.closeFuture.complete(null);
            }
            throw th5;
        }
    }

    public void start() {
        this.startCloseLock.lock();
        try {
            if (this.messageClient.isEmpty()) {
                this.closed = true;
                return;
            }
            this.thread = new Thread(this);
            this.thread.setName(MessageClientConsumer.class.getSimpleName() + "-" + this.messageClient.getAppRespTopic() + "-Thread");
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") starting");
            this.startFuture = new CompletableFuture<>();
            this.thread.start();
            this.startFuture.join();
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") started");
        } finally {
            this.startCloseLock.unlock();
        }
    }

    public void stop() {
        this.startCloseLock.lock();
        try {
            if (this.closeFuture != null) {
                this.closeFuture.join();
                return;
            }
            if (this.kafkaConsumer == null || this.closed) {
                return;
            }
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") stoping");
            this.closeFuture = new CompletableFuture<>();
            this.closed = true;
            this.thread.interrupt();
            this.closeFuture.join();
            this.logger.log(Level.INFO, getClass().getSimpleName() + "(" + this.messageClient.getProtocol() + "-" + Objects.hashCode(this) + ") stoped");
        } finally {
            this.startCloseLock.unlock();
        }
    }
}
