package io.sermant.mq.prohibition.controller.rocketmq;

import io.sermant.core.common.LoggerFactory;
import io.sermant.core.utils.ReflectUtils;
import io.sermant.mq.prohibition.controller.rocketmq.cache.RocketMqConsumerCache;
import io.sermant.mq.prohibition.controller.rocketmq.constant.SubscriptionType;
import io.sermant.mq.prohibition.controller.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import io.sermant.mq.prohibition.controller.utils.RocketMqWrapperUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;

/* loaded from: input_file:io/sermant/mq/prohibition/controller/rocketmq/RocketMqPullConsumerController.class */
public class RocketMqPullConsumerController {
    private static final long DELAY_TIME = 1000;
    private static final int MAXIMUM_RETRY = 5;
    private static final int THREAD_SIZE = 1;
    private static final long THREAD_KEEP_ALIVE_TIME = 60;
    private static final int THREAD_QUEUE_CAPACITY = 20;
    private static final Logger LOGGER = LoggerFactory.getLogger();
    private static volatile ThreadPoolExecutor executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.sermant.mq.prohibition.controller.rocketmq.RocketMqPullConsumerController$1, reason: invalid class name */
    /* loaded from: input_file:io/sermant/mq/prohibition/controller/rocketmq/RocketMqPullConsumerController$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$sermant$mq$prohibition$controller$rocketmq$constant$SubscriptionType = new int[SubscriptionType.values().length];

        static {
            try {
                $SwitchMap$io$sermant$mq$prohibition$controller$rocketmq$constant$SubscriptionType[SubscriptionType.SUBSCRIBE.ordinal()] = RocketMqPullConsumerController.THREAD_SIZE;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$sermant$mq$prohibition$controller$rocketmq$constant$SubscriptionType[SubscriptionType.ASSIGN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private RocketMqPullConsumerController() {
    }

    public static void disablePullConsumption(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper, Set<String> set) {
        Stream<String> stream = defaultLitePullConsumerWrapper.getSubscribedTopics().stream();
        set.getClass();
        if (stream.anyMatch((v1) -> {
            return r1.contains(v1);
        })) {
            suspendPullConsumer(defaultLitePullConsumerWrapper);
        } else {
            resumePullConsumer(defaultLitePullConsumerWrapper);
        }
    }

    private static void suspendPullConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        switch (AnonymousClass1.$SwitchMap$io$sermant$mq$prohibition$controller$rocketmq$constant$SubscriptionType[defaultLitePullConsumerWrapper.getSubscriptionType().ordinal()]) {
            case THREAD_SIZE /* 1 */:
                suspendSubscriptiveConsumer(defaultLitePullConsumerWrapper);
                return;
            case 2:
                suspendAssignedConsumer(defaultLitePullConsumerWrapper);
                return;
            default:
                return;
        }
    }

    private static void suspendSubscriptiveConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        if (defaultLitePullConsumerWrapper.isProhibition()) {
            LOGGER.log(Level.INFO, "Consumer has prohibited consumption, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getSubscribedTopics()});
            return;
        }
        defaultLitePullConsumerWrapper.getPullConsumerImpl().persistConsumerOffset();
        defaultLitePullConsumerWrapper.getClientFactory().unregisterConsumer(defaultLitePullConsumerWrapper.getConsumerGroup());
        doRebalance(defaultLitePullConsumerWrapper);
        defaultLitePullConsumerWrapper.setProhibition(true);
    }

    private static void suspendAssignedConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        DefaultLitePullConsumerImpl pullConsumerImpl = defaultLitePullConsumerWrapper.getPullConsumerImpl();
        if (defaultLitePullConsumerWrapper.getAssignedMessageQueue() == null) {
            Optional<AssignedMessageQueue> assignedMessageQueue = RocketMqWrapperUtils.getAssignedMessageQueue(pullConsumerImpl);
            if (assignedMessageQueue.isPresent()) {
                defaultLitePullConsumerWrapper.setAssignedMessageQueue(assignedMessageQueue.get());
            }
        }
        defaultLitePullConsumerWrapper.getAssignedMessageQueue().updateAssignedMessageQueue(new ArrayList());
        if (defaultLitePullConsumerWrapper.getPullConsumer().isRunning()) {
            ReflectUtils.invokeMethod(pullConsumerImpl, "updateAssignPullTask", new Class[]{Collection.class}, new Object[]{new ArrayList()});
        }
        LOGGER.log(Level.INFO, "Success to prohibit consumption, consumer instance name : {0}, consumer group : {1}, message queue : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getMessageQueues()});
    }

    private static void doRebalance(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        initExecutor();
        messageQueueChanged(defaultLitePullConsumerWrapper);
        executor.submit(() -> {
            doDelayRebalance(defaultLitePullConsumerWrapper);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void doDelayRebalance(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        RebalanceImpl rebalanceImpl = defaultLitePullConsumerWrapper.getRebalanceImpl();
        String instanceName = defaultLitePullConsumerWrapper.getInstanceName();
        String consumerGroup = defaultLitePullConsumerWrapper.getConsumerGroup();
        Set<String> subscribedTopics = defaultLitePullConsumerWrapper.getSubscribedTopics();
        MQClientInstance clientFactory = defaultLitePullConsumerWrapper.getClientFactory();
        String clientId = clientFactory.getClientId();
        int i = 0;
        boolean z = false;
        while (i < MAXIMUM_RETRY && !z) {
            try {
                Thread.sleep(DELAY_TIME);
            } catch (InterruptedException e) {
                LOGGER.log(Level.SEVERE, "An InterruptedException occurs on the thread, details: {0}", e.getMessage());
            }
            Iterator<String> it = subscribedTopics.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                String next = it.next();
                List findConsumerIdList = clientFactory.findConsumerIdList(next, consumerGroup);
                if (findConsumerIdList != null && findConsumerIdList.contains(clientId)) {
                    i += THREAD_SIZE;
                    break;
                } else {
                    messageQueueChanged(rebalanceImpl, next);
                    if (!z) {
                        z = THREAD_SIZE;
                    }
                }
            }
        }
        if (z) {
            LOGGER.log(Level.INFO, "Success to prohibit consumption, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{instanceName, consumerGroup, subscribedTopics});
            return;
        }
        LOGGER.log(Level.SEVERE, "Consumer exiting the {0} consumer group timeout may cause a failure to reallocate the message queue, consumer instance name : {0}, consumer group : {1}, topic : {2}. Please deliver the configuration again.", new Object[]{instanceName, consumerGroup, subscribedTopics});
        defaultLitePullConsumerWrapper.setProhibition(false);
        defaultLitePullConsumerWrapper.getClientFactory().registerConsumer(consumerGroup, defaultLitePullConsumerWrapper.getPullConsumerImpl());
    }

    private static void messageQueueChanged(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        RebalanceImpl rebalanceImpl = defaultLitePullConsumerWrapper.getRebalanceImpl();
        ConcurrentMap subscriptionInner = rebalanceImpl.getSubscriptionInner();
        if (subscriptionInner != null) {
            Iterator it = subscriptionInner.entrySet().iterator();
            while (it.hasNext()) {
                messageQueueChanged(rebalanceImpl, (String) ((Map.Entry) it.next()).getKey());
            }
        }
    }

    private static void messageQueueChanged(RebalanceImpl rebalanceImpl, String str) {
        ReflectUtils.invokeMethod(rebalanceImpl, "updateProcessQueueTableInRebalance", new Class[]{String.class, Set.class, Boolean.TYPE}, new Object[]{str, new HashSet(), false});
        rebalanceImpl.messageQueueChanged(str, (Set) rebalanceImpl.getTopicSubscribeInfoTable().get(str), new HashSet());
    }

    private static void resumePullConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        switch (AnonymousClass1.$SwitchMap$io$sermant$mq$prohibition$controller$rocketmq$constant$SubscriptionType[defaultLitePullConsumerWrapper.getSubscriptionType().ordinal()]) {
            case THREAD_SIZE /* 1 */:
                resumeSubscriptiveConsumer(defaultLitePullConsumerWrapper);
                return;
            case 2:
                resumeAssignedConsumer(defaultLitePullConsumerWrapper);
                return;
            default:
                return;
        }
    }

    private static void resumeSubscriptiveConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        if (!defaultLitePullConsumerWrapper.isProhibition()) {
            LOGGER.log(Level.INFO, "Consumer has opened consumption, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getSubscribedTopics()});
            return;
        }
        String consumerGroup = defaultLitePullConsumerWrapper.getConsumerGroup();
        DefaultLitePullConsumerImpl pullConsumerImpl = defaultLitePullConsumerWrapper.getPullConsumerImpl();
        defaultLitePullConsumerWrapper.getClientFactory().registerConsumer(consumerGroup, pullConsumerImpl);
        pullConsumerImpl.doRebalance();
        defaultLitePullConsumerWrapper.setProhibition(false);
        LOGGER.log(Level.INFO, "Success to open consumption, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getSubscribedTopics()});
    }

    private static void resumeAssignedConsumer(DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper) {
        defaultLitePullConsumerWrapper.getPullConsumer().assign(defaultLitePullConsumerWrapper.getMessageQueues());
        LOGGER.log(Level.INFO, "Success to open consumption, consumer instance name : {0}, consumer group : {1}, message queue : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getMessageQueues()});
    }

    public static void cachePullConsumer(DefaultLitePullConsumer defaultLitePullConsumer) {
        Optional<DefaultLitePullConsumerWrapper> wrapPullConsumer = RocketMqWrapperUtils.wrapPullConsumer(defaultLitePullConsumer);
        if (!wrapPullConsumer.isPresent()) {
            LOGGER.log(Level.SEVERE, "Fail to cache consumer, consumer instance name : {0}, consumer group : {1}", new Object[]{defaultLitePullConsumer.getInstanceName(), defaultLitePullConsumer.getConsumerGroup()});
        } else {
            RocketMqConsumerCache.PULL_CONSUMERS_CACHE.put(Integer.valueOf(defaultLitePullConsumer.hashCode()), wrapPullConsumer.get());
            LOGGER.log(Level.INFO, "Success to cache consumer, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{defaultLitePullConsumer.getInstanceName(), defaultLitePullConsumer.getConsumerGroup(), wrapPullConsumer.get().getSubscribedTopics()});
        }
    }

    public static void removePullConsumer(DefaultLitePullConsumer defaultLitePullConsumer) {
        int hashCode = defaultLitePullConsumer.hashCode();
        DefaultLitePullConsumerWrapper defaultLitePullConsumerWrapper = RocketMqConsumerCache.PULL_CONSUMERS_CACHE.get(Integer.valueOf(hashCode));
        if (defaultLitePullConsumerWrapper != null) {
            RocketMqConsumerCache.PULL_CONSUMERS_CACHE.remove(Integer.valueOf(hashCode));
            LOGGER.log(Level.INFO, "Success to remove consumer, consumer instance name : {0}, consumer group : {1}, topic : {2}", new Object[]{defaultLitePullConsumerWrapper.getInstanceName(), defaultLitePullConsumerWrapper.getConsumerGroup(), defaultLitePullConsumerWrapper.getSubscribedTopics()});
        }
    }

    public static DefaultLitePullConsumerWrapper getPullConsumerWrapper(DefaultLitePullConsumer defaultLitePullConsumer) {
        return RocketMqConsumerCache.PULL_CONSUMERS_CACHE.get(Integer.valueOf(defaultLitePullConsumer.hashCode()));
    }

    public static Map<Integer, DefaultLitePullConsumerWrapper> getPullConsumerCache() {
        return RocketMqConsumerCache.PULL_CONSUMERS_CACHE;
    }

    private static void initExecutor() {
        if (executor == null) {
            synchronized (RocketMqPushConsumerController.class) {
                if (executor == null) {
                    executor = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, THREAD_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(THREAD_QUEUE_CAPACITY));
                    executor.allowCoreThreadTimeOut(true);
                }
            }
        }
    }
}
