package cn.fscode.common.mqtt.config;

import cn.fscode.common.mq.api.enums.QosEnum;
import cn.fscode.common.mqtt.listener.IMqttListener;
import cn.fscode.common.mqtt.listener.MqttMessageListener;
import cn.fscode.common.tool.core.StringUtils;
import cn.fscode.common.tool.core.UUIDUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Resource;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
/* loaded from: input_file:cn/fscode/common/mqtt/config/MqttConsumerConfig.class */
public class MqttConsumerConfig {

    @Resource
    private MqttProperties mqttProperties;

    @Resource
    private ApplicationContext applicationContext;
    public MqttPahoMessageDrivenChannelAdapter adapter;
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    private final MqttConnectConfig mqttConnectConfig = new MqttConnectConfig();
    private static final Logger log = LoggerFactory.getLogger(MqttConsumerConfig.class);
    private static final Map<String, IMqttListener> listenerMap = new ConcurrentHashMap();

    private List<IMqttListener> collectMqttMessageListener() {
        ArrayList arrayList = new ArrayList();
        this.applicationContext.getBeansOfType(IMqttListener.class).forEach((str, iMqttListener) -> {
            arrayList.add(iMqttListener);
        });
        return arrayList;
    }

    private void doListenerConfig(MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter) {
        for (IMqttListener iMqttListener : collectMqttMessageListener()) {
            MqttMessageListener mqttMessageListener = (MqttMessageListener) iMqttListener.getClass().getAnnotation(MqttMessageListener.class);
            if (mqttMessageListener == null) {
                throw new RuntimeException(iMqttListener.getClass().getSimpleName() + " not MqttMessageListener annotation");
            }
            QosEnum[] qos = mqttMessageListener.qos();
            String[] strArr = mqttMessageListener.topic();
            for (String str : strArr) {
                listenerMap.put(str, iMqttListener);
            }
            mqttPahoMessageDrivenChannelAdapter.addTopics(strArr, getQos(qos));
            log.info("register mqtt listener {}", iMqttListener.getClass().getName());
        }
    }

    public int[] getQos(QosEnum[] qosEnumArr) {
        int[] iArr = new int[qosEnumArr.length];
        for (int i = 0; i < qosEnumArr.length; i++) {
            iArr[i] = qosEnumArr[i].getValue();
        }
        return iArr;
    }

    @Bean
    public MqttConnectOptions getConsumerMqttConnectOptions() {
        return this.mqttConnectConfig.getMqttConnectOptions(this.mqttProperties);
    }

    @Bean
    public MqttPahoClientFactory consumerMqttClientFactory() {
        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getConsumerMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

    @Bean(name = {CHANNEL_NAME_IN})
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        this.adapter = new MqttPahoMessageDrivenChannelAdapter(this.mqttProperties.getConsumer().getClientId() + "-" + UUIDUtils.randomUUID(), consumerMqttClientFactory(), StringUtils.split(this.mqttProperties.getConsumer().getDefaultTopic(), ","));
        this.adapter.setCompletionTimeout(5000L);
        this.adapter.setConverter(new DefaultPahoMessageConverter());
        this.adapter.setQos(new int[]{1});
        this.adapter.setOutputChannel(mqttInboundChannel());
        doListenerConfig(this.adapter);
        return this.adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return message -> {
            String obj = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            String obj2 = message.getPayload().toString();
            IMqttListener iMqttListener = listenerMap.get(obj);
            if (iMqttListener == null) {
                log.warn("topic [{}] not find listener", obj);
            } else {
                iMqttListener.onMessage(obj, obj2);
            }
        };
    }
}
