package com.baidu.bigpipe.spring.annotation;

import com.baidu.bigpipe.driver.DefaultListenerSubscriber;
import com.baidu.bigpipe.driver.DefaultQueueListenerSubscriber;
import com.baidu.bigpipe.driver.ListenerSubscriber;
import com.baidu.bigpipe.driver.QueueListenerSubscriber;
import com.baidu.bigpipe.driver.SimpleNoneBlockingPubClient;
import com.baidu.bigpipe.driver.converter.sub.MessageBodyConverter;
import com.baidu.bigpipe.driver.converter.sub.StringMessageBodyConverter;
import com.baidu.bigpipe.position.store.SubcribePositionStore;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.sub.BigpipeMessageListener;
import com.baidu.bigpipe.transport.sub.PipeletIdAwareBigpipeMessageListener;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.util.NumberUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/baidu/bigpipe/spring/annotation/MessageListenerAnnotationResolver.class */
public class MessageListenerAnnotationResolver extends AbstractAnnotationParserCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageListenerAnnotationResolver.class);
    private List<ListenerSubscriber> listenerSubscribers = new ArrayList();
    private List<QueueListenerSubscriber> queueListenerSubscribers = new ArrayList();
    private List<SimpleNoneBlockingPubClient> clients = new ArrayList();

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public Object annotationAtType(Annotation annotation, Object obj, String str, ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        if (annotation instanceof MessageListener) {
            if (!(obj instanceof BigpipeMessageListener)) {
                throw new BeanInitializationException("Annotation 'MessageListener' for target '" + str + "' is not implments " + BigpipeMessageListener.class.getName());
            }
            LOGGER.info("Annotation 'MessageListener' for target '" + str + "' detected and add to message listener callback.");
            this.listenerSubscribers.add(doParseListenerSubscriber((MessageListener) annotation, (BigpipeMessageListener) obj, str, configurableListableBeanFactory));
        }
        if (annotation instanceof QueueMessageListener) {
            if (!(obj instanceof PipeletIdAwareBigpipeMessageListener)) {
                throw new BeanInitializationException("Annotation 'MessageListener' for target '" + str + "' is not implments " + BigpipeMessageListener.class.getName());
            }
            LOGGER.info("Annotation 'QueueMessageListener' for target '" + str + "' detected and add to queue message listener callback.");
            this.queueListenerSubscribers.add(doParseQueueListenerSubscriber((QueueMessageListener) annotation, (PipeletIdAwareBigpipeMessageListener) obj, str, configurableListableBeanFactory));
        }
        return obj;
    }

    private ListenerSubscriber doParseListenerSubscriber(MessageListener messageListener, BigpipeMessageListener bigpipeMessageListener, String str, ConfigurableListableBeanFactory configurableListableBeanFactory) {
        BigPipeConf m639clone = ((BigPipeConf) configurableListableBeanFactory.getBean(parsePlaceholder(messageListener.bigPipeConf()), BigPipeConf.class)).m639clone();
        if (!StringUtils.isEmpty(messageListener.pipeletName())) {
            m639clone.setPipeletName(parsePlaceholder(messageListener.pipeletName()));
        }
        if (!StringUtils.isEmpty(messageListener.cluster())) {
            m639clone.setCluster(parsePlaceholder(messageListener.cluster()));
        }
        if (!StringUtils.isEmpty(messageListener.pipeletId())) {
            m639clone.setPipeletId(((Integer) NumberUtils.parseNumber(parsePlaceholder(messageListener.pipeletId()), Integer.TYPE)).intValue());
        }
        DefaultListenerSubscriber defaultListenerSubscriber = new DefaultListenerSubscriber();
        defaultListenerSubscriber.setConf(m639clone);
        if (!StringUtils.isEmpty(messageListener.subcribePositionStore())) {
            defaultListenerSubscriber.setPositionStore((SubcribePositionStore) configurableListableBeanFactory.getBean(parsePlaceholder(messageListener.subcribePositionStore()), SubcribePositionStore.class));
        }
        defaultListenerSubscriber.setBodyConverter(!StringUtils.isEmpty(messageListener.bodyConverter()) ? (MessageBodyConverter) configurableListableBeanFactory.getBean(parsePlaceholder(messageListener.bodyConverter()), MessageBodyConverter.class) : new StringMessageBodyConverter());
        defaultListenerSubscriber.setMessageListener(bigpipeMessageListener);
        defaultListenerSubscriber.init();
        return defaultListenerSubscriber;
    }

    private QueueListenerSubscriber doParseQueueListenerSubscriber(QueueMessageListener queueMessageListener, PipeletIdAwareBigpipeMessageListener pipeletIdAwareBigpipeMessageListener, String str, ConfigurableListableBeanFactory configurableListableBeanFactory) {
        BigPipeConf m639clone = ((BigPipeConf) configurableListableBeanFactory.getBean(parsePlaceholder(queueMessageListener.bigPipeConf()), BigPipeConf.class)).m639clone();
        if (!StringUtils.isEmpty(queueMessageListener.pipeletName())) {
            m639clone.setPipeletName(parsePlaceholder(queueMessageListener.pipeletName()));
        }
        if (!StringUtils.isEmpty(queueMessageListener.cluster())) {
            m639clone.setCluster(parsePlaceholder(queueMessageListener.cluster()));
        }
        DefaultQueueListenerSubscriber defaultQueueListenerSubscriber = new DefaultQueueListenerSubscriber();
        defaultQueueListenerSubscriber.setConf(m639clone);
        defaultQueueListenerSubscriber.setBodyConverter(!StringUtils.isEmpty(queueMessageListener.bodyConverter()) ? (MessageBodyConverter) configurableListableBeanFactory.getBean(parsePlaceholder(queueMessageListener.bodyConverter()), MessageBodyConverter.class) : new StringMessageBodyConverter());
        defaultQueueListenerSubscriber.setMessageListener(pipeletIdAwareBigpipeMessageListener);
        defaultQueueListenerSubscriber.init();
        return defaultQueueListenerSubscriber;
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public void annotationAtTypeAfterStarted(Annotation annotation, Object obj, String str, ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public Object annotationAtField(Annotation annotation, Object obj, String str, PropertyValues propertyValues, ConfigurableListableBeanFactory configurableListableBeanFactory, Field field) throws BeansException {
        if (!(annotation instanceof MessageSender)) {
            return obj;
        }
        SimpleNoneBlockingPubClient doParseMessageSenderAnnotation = doParseMessageSenderAnnotation((MessageSender) annotation, configurableListableBeanFactory);
        this.clients.add(doParseMessageSenderAnnotation);
        return doParseMessageSenderAnnotation;
    }

    private SimpleNoneBlockingPubClient doParseMessageSenderAnnotation(MessageSender messageSender, ConfigurableListableBeanFactory configurableListableBeanFactory) {
        BigPipeConf m639clone = ((BigPipeConf) configurableListableBeanFactory.getBean(parsePlaceholder(messageSender.bigPipeConf()), BigPipeConf.class)).m639clone();
        if (!StringUtils.isEmpty(messageSender.pipeletName())) {
            m639clone.setPipeletName(parsePlaceholder(messageSender.pipeletName()));
        }
        if (!StringUtils.isEmpty(messageSender.cluster())) {
            m639clone.setCluster(parsePlaceholder(messageSender.cluster()));
        }
        if (!StringUtils.isEmpty(messageSender.pipeletId())) {
            m639clone.setPipeletId(((Integer) NumberUtils.parseNumber(parsePlaceholder(messageSender.pipeletId()), Integer.TYPE)).intValue());
        }
        SimpleNoneBlockingPubClient simpleNoneBlockingPubClient = new SimpleNoneBlockingPubClient();
        simpleNoneBlockingPubClient.setConf(m639clone);
        simpleNoneBlockingPubClient.init();
        return simpleNoneBlockingPubClient;
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public Object annotationAtMethod(Annotation annotation, Object obj, String str, PropertyValues propertyValues, ConfigurableListableBeanFactory configurableListableBeanFactory, Method method) throws BeansException {
        return obj;
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public List<Class<? extends Annotation>> getTypeAnnotation() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(MessageListener.class);
        arrayList.add(QueueMessageListener.class);
        return arrayList;
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public List<Class<? extends Annotation>> getMethodFieldAnnotation() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(MessageSender.class);
        return arrayList;
    }

    @Override // com.baidu.bigpipe.spring.annotation.AnnotationParserCallback
    public void destroy() throws Exception {
        Iterator<ListenerSubscriber> it = this.listenerSubscribers.iterator();
        while (it.hasNext()) {
            try {
                it.next().shutDown();
            } catch (Exception e) {
                LOGGER.error("error on shutdown listener subscriber due to :" + e.getMessage(), e);
            }
        }
        Iterator<QueueListenerSubscriber> it2 = this.queueListenerSubscribers.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().shutDown();
            } catch (Exception e2) {
                LOGGER.error("error on shutdown listener subscriber due to :" + e2.getMessage(), e2);
            }
        }
        Iterator<SimpleNoneBlockingPubClient> it3 = this.clients.iterator();
        while (it3.hasNext()) {
            try {
                it3.next().shutDown();
            } catch (Exception e3) {
                LOGGER.error("error on shutdown sender due to :" + e3.getMessage(), e3);
            }
        }
    }
}
