package com.github.ideahut.qms.shared.kafka;

import com.github.ideahut.qms.shared.queue.QueueSender;
import com.github.ideahut.qms.shared.task.TaskHandler;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

/* loaded from: input_file:com/github/ideahut/qms/shared/kafka/KafkaClient.class */
public class KafkaClient {
    private final Map<String, SubscriberTopic> subscribers;
    private final Vertx vertx;
    private final KafkaAdminClient admin;
    private final Map<String, String> cfgProducer;
    private final Map<String, String> cfgConsumer;
    private final TaskHandler taskHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ideahut/qms/shared/kafka/KafkaClient$SubscriberGroup.class */
    public class SubscriberGroup {
        private String groupId;
        private List<KafkaSubscriber> kafkaSubscribers;

        private SubscriberGroup() {
            this.kafkaSubscribers = new ArrayList();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/ideahut/qms/shared/kafka/KafkaClient$SubscriberTopic.class */
    public class SubscriberTopic {
        private String topic;
        private Map<String, SubscriberGroup> groups;

        private SubscriberTopic() {
            this.groups = new HashMap();
        }
    }

    public KafkaClient(Vertx vertx, Map<String, String> map, TaskHandler taskHandler) throws Exception {
        this.subscribers = new HashMap();
        this.vertx = vertx != null ? vertx : Vertx.vertx();
        HashMap hashMap = map != null ? new HashMap(map) : new HashMap();
        hashMap.remove("key.serializer");
        hashMap.remove("value.serializer");
        hashMap.remove("key.deserializer");
        hashMap.remove("value.deserializer");
        hashMap.remove("group.id");
        this.cfgProducer = getConfig(ProducerConfig.class, hashMap);
        this.cfgConsumer = getConfig(ConsumerConfig.class, hashMap);
        this.admin = KafkaAdminClient.create(vertx, hashMap);
        this.taskHandler = taskHandler;
    }

    public KafkaClient(Map<String, String> map, TaskHandler taskHandler) throws Exception {
        this(null, map, taskHandler);
    }

    public KafkaClient(Map<String, String> map) throws Exception {
        this(null, map, null);
    }

    public <V> void subscribe(Class<V> cls, KafkaSubscriberProperties... kafkaSubscriberPropertiesArr) {
        if (kafkaSubscriberPropertiesArr.length == 0) {
            throw new RuntimeException("Cannot subscribe an empty properties");
        }
        for (KafkaSubscriberProperties kafkaSubscriberProperties : kafkaSubscriberPropertiesArr) {
            KafkaSubscriber kafkaSubscriber = new KafkaSubscriber(cls, this.admin, this.vertx, this.cfgConsumer, kafkaSubscriberProperties);
            kafkaSubscriber.subscribe();
            SubscriberTopic subscriberTopic = this.subscribers.get(kafkaSubscriberProperties.getTopicProperties().getName());
            if (subscriberTopic == null) {
                subscriberTopic = new SubscriberTopic();
                subscriberTopic.topic = kafkaSubscriberProperties.getTopicProperties().getName();
                subscriberTopic.groups = new HashMap();
            }
            SubscriberGroup subscriberGroup = (SubscriberGroup) subscriberTopic.groups.get(kafkaSubscriberProperties.getGroupId());
            if (subscriberGroup == null) {
                subscriberGroup = new SubscriberGroup();
                subscriberGroup.groupId = kafkaSubscriberProperties.getGroupId();
                subscriberGroup.kafkaSubscribers = new ArrayList();
                subscriberTopic.groups.put(subscriberGroup.groupId, subscriberGroup);
            }
            subscriberGroup.kafkaSubscribers.add(kafkaSubscriber);
            this.subscribers.put(subscriberTopic.topic, subscriberTopic);
        }
    }

    public void unsubscribe(String str, String str2) {
        if (str == null) {
            Iterator<String> it = this.subscribers.keySet().iterator();
            while (it.hasNext()) {
                Iterator it2 = this.subscribers.get(it.next()).groups.values().iterator();
                while (it2.hasNext()) {
                    Iterator it3 = ((SubscriberGroup) it2.next()).kafkaSubscribers.iterator();
                    while (it3.hasNext()) {
                        ((KafkaSubscriber) it3.next()).unsubscribe();
                    }
                }
            }
            this.subscribers.clear();
            return;
        }
        SubscriberTopic subscriberTopic = this.subscribers.get(str);
        if (subscriberTopic == null) {
            return;
        }
        if (str2 != null) {
            Iterator it4 = ((SubscriberGroup) subscriberTopic.groups.get(str2)).kafkaSubscribers.iterator();
            while (it4.hasNext()) {
                ((KafkaSubscriber) it4.next()).unsubscribe();
            }
            subscriberTopic.groups.remove(str2);
            return;
        }
        Iterator it5 = subscriberTopic.groups.keySet().iterator();
        while (it5.hasNext()) {
            Iterator it6 = ((SubscriberGroup) subscriberTopic.groups.get((String) it5.next())).kafkaSubscribers.iterator();
            while (it6.hasNext()) {
                ((KafkaSubscriber) it6.next()).unsubscribe();
            }
        }
        subscriberTopic.groups.clear();
    }

    public void unsubscribe(String str) {
        unsubscribe(str, null);
    }

    public void unsubscribe() {
        unsubscribe(null);
    }

    public <V> QueueSender<V> createSender(KafkaSenderProperties kafkaSenderProperties) {
        return new KafkaSender(kafkaSenderProperties.getType(), this.admin, this.vertx, this.cfgProducer, this.taskHandler, kafkaSenderProperties);
    }

    private Map<String, String> getConfig(Class<?> cls, Map<String, String> map) throws Exception {
        HashMap hashMap = new HashMap();
        for (Field field : cls.getFields()) {
            if (Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.getName().endsWith("_CONFIG")) {
                String str = (String) field.get(null);
                if (map.containsKey(str)) {
                    hashMap.put(str, map.get(str));
                }
            }
        }
        return hashMap;
    }
}
