package org.dromara.dynamictp.adapter.rocketmq;

import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.collections4.MapUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
import org.dromara.dynamictp.common.ApplicationContextHolder;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingService;

/* loaded from: input_file:org/dromara/dynamictp/adapter/rocketmq/StreamRocketMqDtpAdapter.class */
public class StreamRocketMqDtpAdapter extends AbstractDtpAdapter {
    private static final Logger log = LoggerFactory.getLogger(StreamRocketMqDtpAdapter.class);
    private static final String NAME = "rocketMqTp";
    private static final String CONSUME_EXECUTOR_FIELD_NAME = "consumeExecutor";

    public void refresh(DtpProperties dtpProperties) {
        refresh(NAME, dtpProperties.getRocketMqTp(), dtpProperties.getPlatforms());
    }

    protected void initialize() {
        super.initialize();
        Map beansOfType = ApplicationContextHolder.getBeansOfType(BindingService.class);
        if (MapUtils.isEmpty(beansOfType)) {
            log.warn("Cannot find beans of type BindingService.");
        } else {
            beansOfType.forEach((str, bindingService) -> {
                adaptConsumerExecutors(bindingService);
                adaptProducerExecutors(bindingService);
            });
            log.info("DynamicTp adapter, rocketMq consumer and producer executors init end, executors: {}", this.executors);
        }
    }

    private void adaptConsumerExecutors(BindingService bindingService) {
        Map map = (Map) ReflectionUtil.getFieldValue(BindingService.class, "consumerBindings", bindingService);
        if (MapUtils.isEmpty(map)) {
            return;
        }
        map.forEach((str, list) -> {
            Binding binding = (Binding) list.get(0);
            Class<?> cls = binding.getClass();
            DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = (DefaultMQPushConsumerImpl) ReflectionUtil.getFieldValue(DefaultMQPushConsumer.class, "defaultMQPushConsumerImpl", (DefaultMQPushConsumer) ReflectionUtil.getFieldValue(RocketMQInboundChannelAdapter.class, "pushConsumer", (RocketMQInboundChannelAdapter) ReflectionUtil.getFieldValue(cls, "lifecycle", binding)));
            if (defaultMQPushConsumerImpl == null || defaultMQPushConsumerImpl.getConsumeMessageService() == null) {
                return;
            }
            ConsumeMessageService consumeMessageService = defaultMQPushConsumerImpl.getConsumeMessageService();
            ThreadPoolExecutor threadPoolExecutor = null;
            if (consumeMessageService instanceof ConsumeMessageConcurrentlyService) {
                threadPoolExecutor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(ConsumeMessageConcurrentlyService.class, CONSUME_EXECUTOR_FIELD_NAME, consumeMessageService);
            } else if (consumeMessageService instanceof ConsumeMessageOrderlyService) {
                threadPoolExecutor = (ThreadPoolExecutor) ReflectionUtil.getFieldValue(ConsumeMessageOrderlyService.class, CONSUME_EXECUTOR_FIELD_NAME, consumeMessageService);
            }
            if (Objects.nonNull(threadPoolExecutor)) {
                String str = ((String) ReflectionUtil.getFieldValue(cls, "group", binding)) + "#" + ((String) ReflectionUtil.getFieldValue(cls, "name", binding));
                ExecutorWrapper executorWrapper = new ExecutorWrapper(str, threadPoolExecutor);
                initNotifyItems(str, executorWrapper);
                this.executors.put(str, executorWrapper);
            }
        });
    }

    private void adaptProducerExecutors(BindingService bindingService) {
        Map map = (Map) ReflectionUtil.getFieldValue(BindingService.class, "producerBindings", bindingService);
        if (MapUtils.isEmpty(map)) {
            return;
        }
        map.forEach((str, binding) -> {
            Class<?> cls = binding.getClass();
            RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) ReflectionUtil.getFieldValue(RocketMQMessageHandler.class, "rocketMQTemplate", (RocketMQMessageHandler) ReflectionUtil.getFieldValue(cls, "lifecycle", binding));
            if (Objects.isNull(rocketMQTemplate)) {
                return;
            }
            DefaultMQProducer producer = rocketMQTemplate.getProducer();
            DefaultMQProducerImpl defaultMQProducerImpl = (DefaultMQProducerImpl) ReflectionUtil.getFieldValue(DefaultMQProducer.class, "defaultMQProducerImpl", producer);
            if (Objects.isNull(defaultMQProducerImpl)) {
                return;
            }
            ExecutorService asyncSenderExecutor = defaultMQProducerImpl.getAsyncSenderExecutor();
            if (Objects.nonNull(asyncSenderExecutor)) {
                String str = producer.getProducerGroup() + "#" + ((String) ReflectionUtil.getFieldValue(cls, "name", binding));
                ExecutorWrapper executorWrapper = new ExecutorWrapper(str, asyncSenderExecutor);
                initNotifyItems(str, executorWrapper);
                this.executors.put(str, executorWrapper);
            }
        });
    }
}
