package com.github.ideahut.kafka;

import com.github.ideahut.queue.QueueHeader;
import com.github.ideahut.queue.QueueMessage;
import com.github.ideahut.util.BeanUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ideahut/kafka/KafkaSubscriber.class */
public class KafkaSubscriber<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSubscriber.class);
    private final List<KafkaSubscriber<K, V>.MessageConsumer> messageConsumers = new ArrayList();
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final KafkaClient kafkaClient;
    private final Map<String, Object> consumerConfigs;
    private final KafkaGroup<K, V> kafkaGroup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ideahut/kafka/KafkaSubscriber$MessageConsumer.class */
    public class MessageConsumer {
        private final Integer index;
        private final KafkaConsumer<K, V> kafkaConsumer;
        private Thread consumerThread;

        public MessageConsumer(Integer num) {
            this.index = num;
            this.kafkaConsumer = new KafkaConsumer<>(KafkaSubscriber.this.consumerConfigs, KafkaSubscriber.this.keyDeserializer, KafkaSubscriber.this.valueDeserializer);
            this.kafkaConsumer.subscribe(Arrays.asList(KafkaSubscriber.this.kafkaGroup.getTopicName()));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KafkaSubscriber<K, V>.MessageConsumer start() {
            if (this.consumerThread == null || this.consumerThread.isInterrupted()) {
                this.consumerThread = new Thread(new Runnable() { // from class: com.github.ideahut.kafka.KafkaSubscriber.MessageConsumer.1
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.lang.Runnable
                    public void run() {
                        while (true) {
                            ConsumerRecords poll = MessageConsumer.this.kafkaConsumer.poll(Duration.ofMillis(100L));
                            if (KafkaSubscriber.this.kafkaGroup.getReceiver() != null) {
                                Iterator it = poll.iterator();
                                while (it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    KafkaSubscriber.LOGGER.debug("{}-{}-Receiver-{}: {}", new Object[]{KafkaSubscriber.this.kafkaGroup.getTopicName(), KafkaSubscriber.this.kafkaGroup.getGroupName(), MessageConsumer.this.index, consumerRecord.value() + ""});
                                    QueueMessage queueMessage = new QueueMessage();
                                    queueMessage.setValue(consumerRecord.value());
                                    queueMessage.setKey(consumerRecord.key());
                                    queueMessage.setTimestamp(consumerRecord.timestamp());
                                    QueueHeader queueHeader = new QueueHeader();
                                    queueHeader.setTopicName(consumerRecord.topic());
                                    queueHeader.setGroupName(KafkaSubscriber.this.kafkaGroup.getGroupName());
                                    queueHeader.setConsumerIndex(MessageConsumer.this.index);
                                    Headers headers = consumerRecord.headers();
                                    if (headers != null) {
                                        for (Header header : headers.toArray()) {
                                            if (header.value() != null) {
                                                queueHeader.put(header.key(), new String(header.value()));
                                            }
                                        }
                                    }
                                    queueMessage.setHeader(queueHeader);
                                    try {
                                        KafkaSubscriber.this.kafkaGroup.getReceiver().onMessageReceive(queueMessage);
                                    } catch (Exception e) {
                                        KafkaSubscriber.LOGGER.error("", e);
                                    }
                                    HashMap hashMap = new HashMap();
                                    hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                                    MessageConsumer.this.kafkaConsumer.commitSync(hashMap);
                                    KafkaSubscriber.LOGGER.debug("Offset committed to Kafka.");
                                }
                            }
                        }
                    }
                });
                this.consumerThread.start();
            }
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            if (this.consumerThread != null) {
                if (!this.consumerThread.isInterrupted()) {
                    this.consumerThread.interrupt();
                }
                BeanUtil.stopThread(this.consumerThread);
                this.consumerThread = null;
            }
        }
    }

    public KafkaSubscriber(Class<K> cls, Class<V> cls2, KafkaClient kafkaClient, Map<String, Object> map, KafkaGroup<K, V> kafkaGroup) throws Exception {
        this.keyDeserializer = KafkaHelper.getDefaultDeserializer(cls);
        if (this.keyDeserializer == null) {
            throw new Exception("KeyDeserializer is null");
        }
        this.valueDeserializer = KafkaHelper.getDefaultDeserializer(cls2);
        if (this.valueDeserializer == null) {
            throw new Exception("ValueDeserializer is null");
        }
        Map hashMap = map != null ? new HashMap(map) : new HashMap();
        hashMap.put("key.deserializer", this.keyDeserializer.getClass().getName());
        hashMap.put("value.deserializer", this.valueDeserializer.getClass().getName());
        hashMap.put("group.id", kafkaGroup.getGroupName());
        this.consumerConfigs = Collections.unmodifiableMap(hashMap);
        this.kafkaClient = kafkaClient;
        this.kafkaGroup = kafkaGroup;
    }

    public Deserializer<K> getKeyDeserializer() {
        return this.keyDeserializer;
    }

    public Deserializer<V> getValueDeserializer() {
        return this.valueDeserializer;
    }

    public KafkaClient getKafkaClient() {
        return this.kafkaClient;
    }

    public Map<String, Object> getConsumerConfigs() {
        return this.consumerConfigs;
    }

    public KafkaGroup<K, V> getKafkaGroup() {
        return this.kafkaGroup;
    }

    public void subscribe() {
        unsubscribe();
        Integer consumers = this.kafkaGroup.getConsumers();
        if (consumers == null || consumers.intValue() < 1) {
            consumers = 1;
        }
        for (int i = 0; i < consumers.intValue(); i++) {
            this.messageConsumers.add(new MessageConsumer(Integer.valueOf(i)).start());
        }
    }

    public void unsubscribe() {
        while (!this.messageConsumers.isEmpty()) {
            this.messageConsumers.remove(0).stop();
        }
    }
}
