package io.sermant.mq.prohibition.kafka.interceptor;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.plugin.agent.entity.ExecuteContext;
import io.sermant.core.plugin.agent.interceptor.AbstractInterceptor;
import io.sermant.mq.prohibition.controller.config.ProhibitionConfigManager;
import io.sermant.mq.prohibition.controller.kafka.KafkaConsumerController;
import io.sermant.mq.prohibition.controller.kafka.cache.KafkaConsumerWrapper;
import io.sermant.mq.prohibition.controller.kafka.extension.KafkaConsumerHandler;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/* loaded from: input_file:io/sermant/mq/prohibition/kafka/interceptor/KafkaConsumerPollInterceptor.class */
public class KafkaConsumerPollInterceptor extends AbstractInterceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger();
    private static final String ERROR_MESSAGE = "Consumer is not subscribed to any topics or assigned any partitions";
    private KafkaConsumerHandler handler;

    public KafkaConsumerPollInterceptor(KafkaConsumerHandler kafkaConsumerHandler) {
        this.handler = kafkaConsumerHandler;
    }

    public KafkaConsumerPollInterceptor() {
    }

    public ExecuteContext before(ExecuteContext executeContext) {
        KafkaConsumerWrapper kafkaConsumerWrapper = KafkaConsumerController.getKafkaConsumerCache().get(Integer.valueOf(executeContext.getObject().hashCode()));
        if (kafkaConsumerWrapper == null) {
            return executeContext;
        }
        if (this.handler != null) {
            this.handler.doBefore(executeContext);
        } else if (kafkaConsumerWrapper.getIsConfigChanged().get()) {
            KafkaConsumerController.disableConsumption(kafkaConsumerWrapper, ProhibitionConfigManager.getKafkaProhibitionTopics());
            kafkaConsumerWrapper.getIsConfigChanged().set(false);
        }
        return executeContext;
    }

    public ExecuteContext after(ExecuteContext executeContext) {
        if (this.handler != null) {
            this.handler.doAfter(executeContext);
        }
        return executeContext;
    }

    public ExecuteContext onThrow(ExecuteContext executeContext) {
        if ((executeContext.getThrowable() instanceof IllegalStateException) && ERROR_MESSAGE.equals(executeContext.getThrowable().getMessage())) {
            executeContext.changeThrowable((Throwable) null);
            executeContext.changeResult(ConsumerRecords.empty());
            LOGGER.fine("No consuming topic at this moment, catch exception and return empty result");
        }
        if (this.handler != null) {
            this.handler.doOnThrow(executeContext);
        }
        return executeContext;
    }
}
