package org.apache.iotdb.db.subscription.agent;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.class */
class SubscriptionAgentLauncher {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionAgentLauncher.class);
    public static final int MAX_RETRY_TIMES = 5;

    private SubscriptionAgentLauncher() {
    }

    public static synchronized void launchSubscriptionTopicAgent() throws StartupException {
        int i = 0;
        while (i < 5) {
            try {
                ConfigNodeClient configNodeClient = (ConfigNodeClient) ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
                try {
                    TGetAllTopicInfoResp allTopicInfo = configNodeClient.getAllTopicInfo();
                    if (allTopicInfo.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        String format = String.format("Failed to get all topic info in config node, status is %s", allTopicInfo.getStatus());
                        LOGGER.warn(format);
                        throw new SubscriptionException(format);
                    }
                    TPushTopicMetaRespExceptionMessage handleTopicMetaChanges = SubscriptionAgent.topic().handleTopicMetaChanges((List) allTopicInfo.getAllTopicInfo().stream().map(byteBuffer -> {
                        TopicMeta deserialize = TopicMeta.deserialize(byteBuffer);
                        LOGGER.info("Pulled topic meta from config node: {}, recovering ...", deserialize);
                        return deserialize;
                    }).collect(Collectors.toList()));
                    if (Objects.nonNull(handleTopicMetaChanges)) {
                        LOGGER.warn(handleTopicMetaChanges.getMessage());
                        throw new SubscriptionException(handleTopicMetaChanges.getMessage());
                    }
                    if (configNodeClient != null) {
                        configNodeClient.close();
                        return;
                    }
                    return;
                } finally {
                }
            } catch (SubscriptionException | ClientManagerException | TException e) {
                i++;
                LOGGER.warn("Failed to get topic meta from config node for {} times, will retry at most {} times.", new Object[]{Integer.valueOf(i), 5, e});
                try {
                    Thread.sleep(i * SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
                } catch (InterruptedException e2) {
                    LOGGER.info("Interrupted while sleeping, will retry to get topic meta from config node.", e2);
                    Thread.currentThread().interrupt();
                }
            }
        }
        throw new StartupException("Failed to get topic meta from config node.");
    }

    public static synchronized void launchSubscriptionConsumerAgent() throws StartupException {
        int i = 0;
        while (i < 5) {
            try {
                ConfigNodeClient configNodeClient = (ConfigNodeClient) ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
                try {
                    TGetAllSubscriptionInfoResp allSubscriptionInfo = configNodeClient.getAllSubscriptionInfo();
                    if (allSubscriptionInfo.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        String format = String.format("Failed to get all subscription info in config node, status is %s", allSubscriptionInfo.getStatus());
                        LOGGER.warn(format);
                        throw new SubscriptionException(format);
                    }
                    TPushConsumerGroupMetaRespExceptionMessage handleConsumerGroupMetaChanges = SubscriptionAgent.consumer().handleConsumerGroupMetaChanges((List) allSubscriptionInfo.getAllSubscriptionInfo().stream().map(byteBuffer -> {
                        ConsumerGroupMeta deserialize = ConsumerGroupMeta.deserialize(byteBuffer);
                        LOGGER.info("Pulled consumer group meta from config node: {}, recovering ...", deserialize);
                        return deserialize;
                    }).collect(Collectors.toList()));
                    if (Objects.nonNull(handleConsumerGroupMetaChanges)) {
                        LOGGER.warn(handleConsumerGroupMetaChanges.getMessage());
                        throw new SubscriptionException(handleConsumerGroupMetaChanges.getMessage());
                    }
                    if (configNodeClient != null) {
                        configNodeClient.close();
                        return;
                    }
                    return;
                } finally {
                }
            } catch (SubscriptionException | ClientManagerException | TException e) {
                i++;
                LOGGER.warn("Failed to get consumer group meta from config node for {} times, will retry at most {} times.", new Object[]{Integer.valueOf(i), 5, e});
                try {
                    Thread.sleep(i * SubscriptionConfig.getInstance().getSubscriptionLaunchRetryIntervalMs());
                } catch (InterruptedException e2) {
                    LOGGER.info("Interrupted while sleeping, will retry to get consumer group meta from config node.", e2);
                    Thread.currentThread().interrupt();
                }
            }
        }
        throw new StartupException("Failed to get consumer group meta from config node.");
    }
}
