package io.sermant.mq.prohibition.dynamicconfig;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.config.ConfigManager;
import io.sermant.core.plugin.config.ServiceMeta;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEvent;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigEventType;
import io.sermant.core.service.dynamicconfig.common.DynamicConfigListener;
import io.sermant.mq.prohibition.controller.config.ProhibitionConfig;
import io.sermant.mq.prohibition.controller.config.ProhibitionConfigManager;
import io.sermant.mq.prohibition.controller.kafka.KafkaConsumerController;
import io.sermant.mq.prohibition.controller.rocketmq.RocketMqPullConsumerController;
import io.sermant.mq.prohibition.controller.rocketmq.RocketMqPushConsumerController;
import io.sermant.mq.prohibition.controller.rocketmq.cache.RocketMqConsumerCache;
import io.sermant.mq.prohibition.controller.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import io.sermant.mq.prohibition.controller.rocketmq.wrapper.DefaultMqPushConsumerWrapper;
import java.util.Locale;
import java.util.logging.Logger;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.error.YAMLException;
import org.yaml.snakeyaml.representer.Representer;

/* loaded from: input_file:io/sermant/mq/prohibition/dynamicconfig/MqConfigListener.class */
public class MqConfigListener implements DynamicConfigListener {
    public static final String GLOBAL_CONFIG_KEY = "sermant.mq.consume.globalConfig";
    public static final String LOCAL_CONFIG_KEY_PREFIX = "sermant.mq.consume.";
    private static final Logger LOGGER = LoggerFactory.getLogger();
    private final Yaml yaml;

    public MqConfigListener() {
        Representer representer = new Representer(new DumperOptions());
        representer.getPropertyUtils().setSkipMissingProperties(true);
        this.yaml = new Yaml(representer);
    }

    public void process(DynamicConfigEvent dynamicConfigEvent) {
        try {
            if (dynamicConfigEvent.getEventType() == DynamicConfigEventType.INIT) {
                processInitEvent(dynamicConfigEvent);
            } else if (dynamicConfigEvent.getEventType() == DynamicConfigEventType.DELETE) {
                processDeleteEvent(dynamicConfigEvent);
            } else {
                processCreateOrUpdateEvent(dynamicConfigEvent);
            }
        } catch (YAMLException e) {
            LOGGER.severe(String.format(Locale.ROOT, "Fail to convert dynamic mq-consume-prohibition config, %s", e.getMessage()));
        }
    }

    private void processCreateOrUpdateEvent(DynamicConfigEvent dynamicConfigEvent) {
        if (GLOBAL_CONFIG_KEY.equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateGlobalConfig((ProhibitionConfig) this.yaml.loadAs(dynamicConfigEvent.getContent(), ProhibitionConfig.class));
            executeProhibition();
        }
        if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateLocalConfig((ProhibitionConfig) this.yaml.loadAs(dynamicConfigEvent.getContent(), ProhibitionConfig.class));
            executeProhibition();
        }
        LOGGER.info(String.format(Locale.ROOT, "Update mq-consume-prohibition config, current config: %s", ProhibitionConfigManager.printConfig()));
    }

    private void processDeleteEvent(DynamicConfigEvent dynamicConfigEvent) {
        if (GLOBAL_CONFIG_KEY.equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateGlobalConfig(new ProhibitionConfig());
            executeProhibition();
        }
        if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateLocalConfig(new ProhibitionConfig());
            executeProhibition();
        }
        LOGGER.info(String.format(Locale.ROOT, "Delete mq-consume-prohibition config, current config: %s", ProhibitionConfigManager.printConfig()));
    }

    private void executeProhibition() {
        KafkaConsumerController.getKafkaConsumerCache().values().forEach(kafkaConsumerWrapper -> {
            kafkaConsumerWrapper.getIsConfigChanged().set(true);
        });
        RocketMqConsumerCache.PUSH_CONSUMERS_CACHE.entrySet().forEach(entry -> {
            RocketMqPushConsumerController.disablePushConsumption((DefaultMqPushConsumerWrapper) entry.getValue(), ProhibitionConfigManager.getRocketMqProhibitionTopics());
        });
        RocketMqConsumerCache.PULL_CONSUMERS_CACHE.entrySet().forEach(entry2 -> {
            RocketMqPullConsumerController.disablePullConsumption((DefaultLitePullConsumerWrapper) entry2.getValue(), ProhibitionConfigManager.getRocketMqProhibitionTopics());
        });
    }

    private void processInitEvent(DynamicConfigEvent dynamicConfigEvent) {
        if (GLOBAL_CONFIG_KEY.equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateGlobalConfig((ProhibitionConfig) this.yaml.loadAs(dynamicConfigEvent.getContent(), ProhibitionConfig.class));
        }
        if ((LOCAL_CONFIG_KEY_PREFIX + ConfigManager.getConfig(ServiceMeta.class).getService()).equals(dynamicConfigEvent.getKey())) {
            ProhibitionConfigManager.updateLocalConfig((ProhibitionConfig) this.yaml.loadAs(dynamicConfigEvent.getContent(), ProhibitionConfig.class));
        }
        LOGGER.info(String.format(Locale.ROOT, "Init mq-consume-prohibition config, current config: %s", ProhibitionConfigManager.printConfig()));
    }
}
