package xin.manong.weapon.base.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:xin/manong/weapon/base/kafka/KafkaConsumeGroup.class */
public class KafkaConsumeGroup {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumeGroup.class);
    private KafkaConsumeConfig config;
    private KafkaConsumer[] consumers;
    private KafkaRecordProcessor processor;

    public KafkaConsumeGroup(KafkaConsumeConfig kafkaConsumeConfig) {
        this.config = kafkaConsumeConfig;
    }

    public KafkaConsumeGroup(KafkaConsumeConfig kafkaConsumeConfig, KafkaRecordProcessor kafkaRecordProcessor) {
        this(kafkaConsumeConfig);
        this.processor = kafkaRecordProcessor;
    }

    public boolean start() {
        logger.info("kafka consume group[{}] is starting ...", this.config == null ? null : this.config.name);
        if (this.config == null || !this.config.check()) {
            logger.error("kafka consume config is invalid");
            return false;
        }
        if (this.processor == null) {
            logger.error("kafka record processor is null");
            return false;
        }
        this.consumers = new KafkaConsumer[this.config.consumeThreadNum.intValue()];
        for (int i = 0; i < this.config.consumeThreadNum.intValue(); i++) {
            String format = String.format("%s_%d", this.config.name, Integer.valueOf(i));
            this.consumers[i] = new KafkaConsumer(format, this.config, this.processor);
            if (!this.consumers[i].start()) {
                logger.error("start kafka consumer[{}] failed", format);
                return false;
            }
        }
        logger.info("kafka consume group[{}] has been started", this.config.name);
        return true;
    }

    public void stop() {
        logger.info("kafka consume group[{}] is stopping ...", this.config.name);
        for (int i = 0; this.consumers != null && i < this.consumers.length; i++) {
            this.consumers[i].stop();
        }
        logger.info("kafka consume group[{}] has been stopped", this.config.name);
    }

    public void setProcessor(KafkaRecordProcessor kafkaRecordProcessor) {
        this.processor = kafkaRecordProcessor;
    }
}
