package cn.geektool.kafka.consumer.process;

import cn.geektool.core.observer.AbstractObServer;
import cn.geektool.core.reflect.BaseReflectInvoke;
import cn.geektool.core.reflect.DefaultReflectInvoke;
import cn.geektool.core.util.StrUtil;
import cn.geektool.kafka.admin.event.KafkaEvent;
import cn.geektool.kafka.admin.external.IKafkaConsumer;
import cn.geektool.kafka.consumer.bean.ConsumerBean;
import cn.geektool.kafka.consumer.bean.KafkaCallbackBean;
import cn.geektool.kafka.global.exception.JkafkaException;
import cn.geektool.kafka.producer.bean.KafkaMsg;
import com.alibaba.fastjson.JSONObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/geektool/kafka/consumer/process/DefaultKafkaConsumer.class */
public class DefaultKafkaConsumer extends AbstractObServer<JSONObject> implements IKafkaConsumer<JSONObject> {
    private String topic;
    private List<TopicPartition> partition;
    private Map<String, Object> consumerMap;
    private ConsumerBean consumerBean;
    private KafkaConsumer<Object, Object> consumer;
    private BaseReflectInvoke invoke;
    private AtomicBoolean running = new AtomicBoolean(true);
    private Logger logger = LoggerFactory.getLogger(getClass());
    private final String ACTION = "action";
    private final String PARAM = "param";

    @Override // cn.geektool.kafka.admin.external.IKafkaConsumer
    public DefaultKafkaConsumer invoke(BaseReflectInvoke baseReflectInvoke) {
        this.invoke = baseReflectInvoke;
        return this;
    }

    private void start() {
        try {
            try {
                this.logger.info("start consumer properties:{}", this.consumerMap);
                this.consumer = new KafkaConsumer<>(this.consumerMap);
                this.consumer.assign(this.partition);
                if (this.invoke == null) {
                    this.invoke = new DefaultReflectInvoke();
                }
                Boolean valueOf = Boolean.valueOf(StrUtil.isNotEmpty(this.consumerBean.getInvokeCallback()));
                while (this.running.get()) {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(this.consumerBean.getPull().longValue()));
                    poll.forEach(consumerRecord -> {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("kafka反馈给系统的数据是:{}", consumerRecord);
                        }
                        try {
                            this.invoke.getMethodToValue(this.consumerBean.getInvokeBeanName(), this.consumerBean.getInvokeMethodName(), new KafkaMsg(consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), consumerRecord.value()));
                        } catch (Exception e) {
                            this.logger.error("消费数据异常", e);
                        }
                    });
                    if (!valueOf.booleanValue() || poll.isEmpty()) {
                        this.consumer.commitAsync();
                    } else {
                        this.consumer.commitAsync((map, exc) -> {
                            ArrayList arrayList = new ArrayList(map.size());
                            map.forEach((topicPartition, offsetAndMetadata) -> {
                                arrayList.add(new KafkaCallbackBean(topicPartition.topic(), Long.valueOf(offsetAndMetadata.offset()), exc));
                            });
                            try {
                                this.invoke.getMethodToValue(this.consumerBean.getInvokeBeanName(), this.consumerBean.getInvokeCallback(), arrayList);
                            } catch (Exception e) {
                                this.logger.error("消费确认数据异常", e);
                            }
                        });
                    }
                }
            } catch (Exception e) {
                throw new JkafkaException(e.getMessage(), e);
            }
        } finally {
            if (this.consumer != null) {
                this.consumer.commitAsync();
                this.consumer.close();
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("consumer {} close", this.topic);
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        start();
    }

    private void destroy(Map<String, Object> map) {
        if (map.get(this.topic) != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("destory current topic:{} satisfied,params:{} ", this.topic, map);
            }
            this.running.compareAndSet(true, false);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("destory current topis:{},groupId:{}", this.topic, this.consumerMap.get("group.id"));
            }
        }
    }

    @Override // cn.geektool.core.observer.AbstractObServer
    protected void execute() {
        if (KafkaEvent.CREATE == ((JSONObject) this.data).getOrDefault("action", KafkaEvent.CREATE)) {
            this.topic = ((JSONObject) this.data).getString("topic");
            this.consumerBean = (ConsumerBean) ((JSONObject) this.data).get("consumerBean");
            this.partition = (List) ((JSONObject) this.data).get("partition");
            this.consumerMap = ((JSONObject) this.data).getJSONObject("consumerMap");
        }
        if (KafkaEvent.DESTROY == ((JSONObject) this.data).get("action")) {
            destroy((Map) ((JSONObject) this.data).get("param"));
        }
        if (KafkaEvent.PAUSE == ((JSONObject) this.data).get("action")) {
            this.running.compareAndSet(true, false);
        }
        if (KafkaEvent.RESUME == ((JSONObject) this.data).get("action")) {
            this.running.compareAndSet(false, true);
            start();
        }
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setPartition(List<TopicPartition> list) {
        this.partition = list;
    }

    public void setConsumerMap(Map<String, Object> map) {
        this.consumerMap = map;
    }

    public void setConsumerBean(ConsumerBean consumerBean) {
        this.consumerBean = consumerBean;
    }

    public void setConsumer(KafkaConsumer<Object, Object> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }
}
