/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.dynamictp.adapter.rocketmq;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.dromara.dynamictp.adapter.common.AbstractDtpAdapter;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.dromara.dynamictp.jvmti.JVMTI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMqDtpAdapter
extends AbstractDtpAdapter {
    private static final Logger log = LoggerFactory.getLogger(RocketMqDtpAdapter.class);
    private static final String TP_PREFIX = "rocketMqTp";
    private static final String CONSUME_EXECUTOR_FIELD = "consumeExecutor";

    public void refresh(DtpProperties dtpProperties) {
        this.refresh(dtpProperties.getRocketMqTp(), dtpProperties.getPlatforms());
    }

    protected String getTpPrefix() {
        return TP_PREFIX;
    }

    protected void initialize() {
        super.initialize();
        this.adaptConsumerExecutors();
        this.adaptProducerExecutors();
    }

    public void adaptConsumerExecutors() {
        List beans = JVMTI.getInstances(DefaultMQPushConsumer.class);
        if (CollectionUtils.isEmpty((Collection)beans)) {
            log.warn("Cannot find beans of type DefaultMQPushConsumer.");
            return;
        }
        for (DefaultMQPushConsumer consumer : beans) {
            ConsumeMessageService consumeMessageService;
            ThreadPoolExecutor executor;
            DefaultMQPushConsumerImpl pushConsumer = (DefaultMQPushConsumerImpl)ReflectionUtil.getFieldValue(DefaultMQPushConsumer.class, (String)"defaultMQPushConsumerImpl", (Object)consumer);
            if (Objects.isNull(pushConsumer) || !Objects.nonNull(executor = (ThreadPoolExecutor)ReflectionUtil.getFieldValue((String)CONSUME_EXECUTOR_FIELD, (Object)(consumeMessageService = pushConsumer.getConsumeMessageService())))) continue;
            String tpName = consumer.getConsumerGroup();
            if (consumeMessageService instanceof ConsumeMessageConcurrentlyService) {
                tpName = "rocketMqTp#consumer#concurrently#" + tpName;
            } else if (consumeMessageService instanceof ConsumeMessageOrderlyService) {
                tpName = "rocketMqTp#consumer#orderly#" + tpName;
            }
            this.enhanceOriginExecutor(tpName, executor, CONSUME_EXECUTOR_FIELD, consumeMessageService);
        }
    }

    public void adaptProducerExecutors() {
        List beans = JVMTI.getInstances(DefaultMQProducer.class);
        if (CollectionUtils.isEmpty((Collection)beans)) {
            log.warn("Cannot find beans of type DefaultMQProducer.");
            return;
        }
        for (DefaultMQProducer defaultMQProducer : beans) {
            ThreadPoolExecutor executor;
            DefaultMQProducerImpl producer = (DefaultMQProducerImpl)ReflectionUtil.getFieldValue(DefaultMQProducer.class, (String)"defaultMQProducerImpl", (Object)defaultMQProducer);
            if (Objects.isNull(producer) || !Objects.nonNull(executor = (ThreadPoolExecutor)producer.getAsyncSenderExecutor())) continue;
            ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(executor);
            producer.setAsyncSenderExecutor((ExecutorService)proxy);
            String proKey = "rocketMqTp#producer#" + defaultMQProducer.getProducerGroup();
            this.putAndFinalize(proKey, executor, (Executor)proxy);
        }
    }
}

