package io.shulie.takin.sdk.kafka.impl;

import io.shulie.takin.sdk.kafka.MessageReceiveCallBack;
import io.shulie.takin.sdk.kafka.MessageReceiveService;
import io.shulie.takin.sdk.kafka.util.MessageSwitchUtil;
import io.shulie.takin.sdk.kafka.util.PropertiesReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/shulie/takin/sdk/kafka/impl/MessageReceiveServiceImpl.class */
public class MessageReceiveServiceImpl implements MessageReceiveService {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveServiceImpl.class.getName());
    private KafkaConsumer<String, byte[]> kafkaConsumer;
    private MessageDeserializer deserializer;
    private String groupId;
    private List<String> stringValueTopic = new ArrayList();

    public MessageReceiveServiceImpl() {
    }

    public MessageReceiveServiceImpl(String str) {
        this.groupId = str;
    }

    public void init() {
        String str = null;
        String str2 = null;
        if (!MessageSwitchUtil.KAFKA_SDK_SWITCH) {
            LOGGER.warn("kafka开关处理关闭状态，不进行发送初始化");
            return;
        }
        try {
            str = PropertiesReader.getInstance().getProperty("kafka.sdk.bootstrap", "");
            str2 = PropertiesReader.getInstance().getProperty("kafka.poll.timeout", "2000");
        } catch (Exception e) {
            LOGGER.error("读取配置文件失败", e);
        }
        if (StringUtils.isBlank(str)) {
            LOGGER.info("kafka配置serverConfig未找到，不进行kafka发送初始化");
            return;
        }
        LOGGER.info("消息接收获取到地址为:{},超时时间为:{}", str, str2);
        String property = PropertiesReader.getInstance().getProperty("kafka.auth.flag", "false");
        LOGGER.info("是否使用权限认证:{}", property);
        Properties properties = new Properties();
        properties.put("max.poll.records", 100);
        properties.put("bootstrap.servers", str);
        properties.put("group.id", "kafka-sdk-consumer");
        if (StringUtils.isNotBlank(this.groupId)) {
            properties.put("group.id", this.groupId);
        }
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        if ("true".equals(property)) {
            String property2 = PropertiesReader.getInstance().getProperty("security.protocol", "");
            String property3 = PropertiesReader.getInstance().getProperty("sasl.mechanism", "");
            String property4 = PropertiesReader.getInstance().getProperty("sasl.jaas.config", "");
            properties.put("security.protocol", property2);
            properties.put("sasl.mechanism", property3);
            properties.put("sasl.jaas.config", property4);
        }
        this.kafkaConsumer = new KafkaConsumer<>(properties);
        try {
            this.deserializer = new MessageDeserializer();
        } catch (Exception e2) {
            LOGGER.error("初始化反序列化工具失败", e2);
        }
        this.stringValueTopic.add("stress-test-engine-pressure-data");
        this.stringValueTopic.add("stress-test-agent-monitor");
    }

    public void stop() {
        this.kafkaConsumer.close();
    }

    public void receive(List<String> list, MessageReceiveCallBack messageReceiveCallBack) {
        if (this.kafkaConsumer == null) {
            return;
        }
        this.kafkaConsumer.subscribe(list);
        while (true) {
            try {
                this.kafkaConsumer.poll(300L).forEach(consumerRecord -> {
                    try {
                        byte[] bArr = (byte[]) consumerRecord.value();
                        if (bArr == null) {
                            messageReceiveCallBack.fail("接收到消息为空");
                        } else {
                            messageReceiveCallBack.success(this.deserializer.deserializeJSON(bArr, this.stringValueTopic.contains(consumerRecord.topic())));
                        }
                    } catch (Exception e) {
                        messageReceiveCallBack.fail(e.getMessage());
                    }
                });
            } catch (Exception e) {
                messageReceiveCallBack.fail(e.getMessage());
            }
        }
    }
}
