package org.cg.eventbus.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.cg.eventbus.IEventListener;

/* loaded from: input_file:org/cg/eventbus/consumer/SymmetricDispatcher.class */
public class SymmetricDispatcher<K, V> implements IConsumer<K, V> {
    private static final Logger logger = Logger.getLogger(SymmetricDispatcher.class);
    private String groupID;
    private Properties config;
    private ConsumerConnector consumer;
    private Decoder<K> keyDecoder;
    private Decoder<V> valueDecoder;
    private List<String> topics;
    private Map<String, Integer> topic_streamNumMap;
    private Map<String, List<IEventListener>> topic_listenersMap;
    private ExecutorService executor;

    /* loaded from: input_file:org/cg/eventbus/consumer/SymmetricDispatcher$EventPuller.class */
    private class EventPuller implements Runnable {
        private KafkaStream stream;
        private String name;
        private List<IEventListener> listeners;

        public EventPuller(KafkaStream kafkaStream, String str, List<IEventListener> list) {
            this.stream = kafkaStream;
            this.name = str;
            this.listeners = list;
            SymmetricDispatcher.logger.info("consumer [" + SymmetricDispatcher.this.groupID + "] is launching thread..");
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            ConsumerIterator it = this.stream.iterator();
            SymmetricDispatcher.logger.info("In runnable thread[" + this.name + "]...");
            while (it.hasNext()) {
                MessageAndMetadata next = it.next();
                try {
                    Object key = next.key();
                    Object message = next.message();
                    for (IEventListener iEventListener : this.listeners) {
                        try {
                            if (iEventListener.beforeProcess(key, message)) {
                                try {
                                    if (iEventListener.process(key, message)) {
                                        try {
                                            if (!iEventListener.postProcess(key, message)) {
                                                SymmetricDispatcher.logger.error("Job stopped at postHandle");
                                            }
                                        } catch (Throwable th) {
                                            SymmetricDispatcher.logger.error("Listener fail to post handle event", th);
                                        }
                                    } else {
                                        SymmetricDispatcher.logger.error("Job stopped at handleEvent");
                                    }
                                } catch (Throwable th2) {
                                    SymmetricDispatcher.logger.error("Listener fail to handle event", th2);
                                }
                            } else {
                                SymmetricDispatcher.logger.error("Job stopped at preHandle");
                            }
                        } catch (Throwable th3) {
                            SymmetricDispatcher.logger.error("Listener fail to pre handle event", th3);
                        }
                    }
                } catch (Throwable th4) {
                    SymmetricDispatcher.logger.error("Fail to get message from stream: ", th4);
                }
            }
        }
    }

    public SymmetricDispatcher(String str) throws Exception {
        this((Configuration) new PropertiesConfiguration(str));
    }

    public SymmetricDispatcher(Configuration configuration) throws Exception {
        this(ConfigurationConverter.getProperties(configuration));
    }

    public SymmetricDispatcher(Properties properties) throws Exception {
        this.config = properties;
        init();
        setup();
    }

    private void init() throws Exception {
        ConsumerConfigrator.validate(this.config);
        this.groupID = this.config.getProperty(IConsumer.GROUP_ID) + "_" + this.config.getProperty("topics");
        logger.info("consumer [" + this.groupID + "] has following properties: " + this.config);
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.config));
        logger.info("consumer [" + this.groupID + "] has following properties: " + this.config);
        logger.info("Start consumer - " + Thread.currentThread().getName() + ", group - " + this.groupID);
    }

    private void setup() throws Exception {
        this.keyDecoder = ConsumerConfigrator.configKeyDecoder(this.config);
        this.valueDecoder = ConsumerConfigrator.configValueDecoder(this.config);
        this.topic_streamNumMap = ConsumerConfigrator.configTopicStreamsMap(this.config);
        logger.info("consumer [" + this.groupID + "] has thread Map: " + this.topic_streamNumMap);
        this.topic_listenersMap = ConsumerConfigrator.configTopicListenersMap(this.config);
        logger.info("consumer [" + this.groupID + "] has listener Map: " + this.topic_listenersMap);
        this.topics = new ArrayList(this.topic_streamNumMap.keySet());
    }

    public String getGroupId() {
        return this.groupID;
    }

    public void launch() {
        Map createMessageStreams = this.consumer.createMessageStreams(this.topic_streamNumMap, this.keyDecoder, this.valueDecoder);
        int parseInt = Integer.parseInt(this.config.getProperty(IConsumer.CONSUMER_THREAD_POOL_SIZE, IConsumer.DEFAULT_THREAD_POOL_SIZE));
        this.executor = Executors.newFixedThreadPool(parseInt);
        logger.info("consumer [" + this.groupID + "] has thread pool with size " + parseInt);
        for (String str : this.topics) {
            for (KafkaStream kafkaStream : (List) createMessageStreams.get(str)) {
                List<IEventListener> list = this.topic_listenersMap.get(str);
                String str2 = "EventBus dispatcher group - " + this.groupID + " topic -" + str;
                logger.info(str2 + " is launching, listener# = " + list.size());
                this.executor.submit(new EventPuller(kafkaStream, str2, this.topic_listenersMap.get(str)));
            }
        }
    }

    public void shutdown() {
        logger.info("consumer [" + this.groupID + "] Shutting down....");
        this.consumer.commitOffsets();
        this.consumer.shutdown();
        this.executor.shutdown();
    }
}
