package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.class */
public class PulsarOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
    private static final int MAX_PARTITION = 1000;
    private static final int RETRY_TIMES = 3;
    private static final int DELAY_SECONDS = 5;

    @Autowired
    public DeserializeOperatorFactory deserializeOperatorFactory;

    @Autowired
    private ConversionHandle conversionHandle;

    @Autowired
    private RestTemplate restTemplate;

    public void createTenant(PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        LOGGER.info("begin to create pulsar tenant={}", str);
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "Tenant cannot be empty");
        try {
            List<String> clusters = PulsarUtils.getClusters(this.restTemplate, pulsarClusterInfo);
            if (tenantIsExists(pulsarClusterInfo, str)) {
                LOGGER.warn("pulsar tenant={} already exists, skip to create", str);
                return;
            }
            PulsarTenantInfo pulsarTenantInfo = new PulsarTenantInfo();
            pulsarTenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
            pulsarTenantInfo.setAdminRoles(Sets.newHashSet());
            PulsarUtils.createTenant(this.restTemplate, pulsarClusterInfo, str, pulsarTenantInfo);
            LOGGER.info("success to create pulsar tenant={}", str);
        } catch (Exception e) {
            LOGGER.error("failed to create pulsar tenant=" + str, e);
            throw e;
        }
    }

    public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInfo inlongPulsarInfo, String str, String str2) throws Exception {
        Preconditions.expectNotBlank(str, ErrorCodeEnum.INVALID_PARAMETER, "pulsar tenant cannot be empty during create namespace");
        Preconditions.expectNotBlank(str2, ErrorCodeEnum.INVALID_PARAMETER, "pulsar namespace cannot be empty during create namespace");
        String str3 = str + "/" + str2;
        LOGGER.info("begin to create namespace={}", str3);
        try {
            if (namespaceExists(pulsarClusterInfo, str, str3)) {
                LOGGER.warn("namespace={} already exists, skip to create", str3);
                return;
            }
            PulsarNamespacePolicies pulsarNamespacePolicies = new PulsarNamespacePolicies();
            Integer ttl = inlongPulsarInfo.getTtl();
            if (ttl.intValue() > 0) {
                pulsarNamespacePolicies.setMessageTtlInSeconds(this.conversionHandle.handleConversion(ttl, inlongPulsarInfo.getTtlUnit().toLowerCase() + "_seconds").intValue());
            }
            Integer retentionTime = inlongPulsarInfo.getRetentionTime();
            if (retentionTime.intValue() > 0) {
                retentionTime = this.conversionHandle.handleConversion(retentionTime, inlongPulsarInfo.getRetentionTimeUnit().toLowerCase() + "_minutes");
            }
            Integer retentionSize = inlongPulsarInfo.getRetentionSize();
            if (retentionSize.intValue() > 0) {
                retentionSize = this.conversionHandle.handleConversion(retentionSize, inlongPulsarInfo.getRetentionSizeUnit().toLowerCase() + "_mb");
            }
            pulsarNamespacePolicies.setRetentionPolicies(new PulsarRetentionPolicies(retentionTime.intValue(), retentionSize.intValue()));
            pulsarNamespacePolicies.setPersistence(new PulsarPersistencePolicies(inlongPulsarInfo.getEnsemble().intValue(), inlongPulsarInfo.getWriteQuorum().intValue(), inlongPulsarInfo.getAckQuorum().intValue(), inlongPulsarInfo.getMaxMarkDeleteRate().doubleValue()));
            PulsarUtils.createNamespace(this.restTemplate, pulsarClusterInfo, str, str2, pulsarNamespacePolicies);
            LOGGER.info("success to create namespace={}", str3);
        } catch (Exception e) {
            LOGGER.error("failed to create namespace=" + str3, e);
            throw e;
        }
    }

    public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo pulsarTopicInfo) throws Exception {
        Preconditions.expectNotNull(pulsarTopicInfo, "pulsar topic info cannot be empty");
        String pulsarTenant = pulsarTopicInfo.getPulsarTenant();
        String namespace = pulsarTopicInfo.getNamespace();
        String topicName = pulsarTopicInfo.getTopicName();
        String str = pulsarTenant + "/" + namespace + "/" + topicName;
        if (topicExists(pulsarClusterInfo, pulsarTenant, namespace, topicName, "PARALLEL".equals(pulsarTopicInfo.getQueueModule()))) {
            LOGGER.warn("pulsar topic={} already exists in {}", str, pulsarClusterInfo.getAdminUrl());
            return;
        }
        try {
            if ("SERIAL".equals(pulsarTopicInfo.getQueueModule())) {
                PulsarUtils.createNonPartitionedTopic(this.restTemplate, pulsarClusterInfo, str);
                LOGGER.info("success to create topic={}, lookup result is {}", str, PulsarUtils.lookupTopic(this.restTemplate, pulsarClusterInfo, str));
            } else {
                PulsarUtils.getClusters(this.restTemplate, pulsarClusterInfo);
                Integer numPartitions = pulsarTopicInfo.getNumPartitions();
                if (numPartitions.intValue() < 0 || numPartitions.intValue() >= MAX_PARTITION) {
                    numPartitions = Integer.valueOf(PulsarUtils.getBrokers(this.restTemplate, pulsarClusterInfo).size());
                }
                PulsarUtils.createPartitionedTopic(this.restTemplate, pulsarClusterInfo, str, numPartitions);
                Map<String, String> lookupPartitionedTopic = PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, str);
                if (lookupPartitionedTopic.size() != numPartitions.intValue()) {
                    for (int i = 0; i < RETRY_TIMES && lookupPartitionedTopic.size() != numPartitions.intValue(); i++) {
                        lookupPartitionedTopic = PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, str);
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e) {
                            LOGGER.error("Thread has been interrupted");
                        }
                    }
                }
                if (numPartitions.intValue() != lookupPartitionedTopic.size()) {
                    throw new Exception("The number of partitions not equal to lookupPartitionedTopic");
                }
                LOGGER.info("success to create topic={}", str);
            }
        } catch (Exception e2) {
            LOGGER.error("failed to create topic=" + str, e2);
            throw e2;
        }
    }

    public void forceDeleteTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo pulsarTopicInfo) throws Exception {
        Preconditions.expectNotNull(pulsarTopicInfo, "pulsar topic info cannot be empty");
        String pulsarTenant = pulsarTopicInfo.getPulsarTenant();
        String namespace = pulsarTopicInfo.getNamespace();
        String topicName = pulsarTopicInfo.getTopicName();
        String str = pulsarTenant + "/" + namespace + "/" + topicName;
        boolean equals = "PARALLEL".equals(pulsarTopicInfo.getQueueModule());
        if (topicExists(pulsarClusterInfo, pulsarTenant, namespace, topicName, equals)) {
            LOGGER.warn("pulsar topic={} already delete", str);
            return;
        }
        try {
            PulsarUtils.forceDeleteTopic(this.restTemplate, pulsarClusterInfo, str, equals);
            LOGGER.info("success to delete topic={}", str);
        } catch (Exception e) {
            LOGGER.error("failed to delete topic=" + str, e);
            throw e;
        }
    }

    public void createSubscription(PulsarClusterInfo pulsarClusterInfo, String str, String str2, String str3) throws Exception {
        LOGGER.info("begin to create pulsar subscription={} for topic={}", str3, str);
        try {
            if (subscriptionExists(pulsarClusterInfo, str, str3, "PARALLEL".equals(str2))) {
                LOGGER.warn("pulsar subscription={} already exists, skip to create", str3);
            } else {
                PulsarUtils.createSubscription(this.restTemplate, pulsarClusterInfo, str, str3);
                LOGGER.info("success to create subscription={}", str3);
            }
        } catch (Exception e) {
            LOGGER.error("failed to create pulsar subscription=" + str3, e);
            throw e;
        }
    }

    public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String str, PulsarTopicInfo pulsarTopicInfo, List<String> list) throws Exception {
        for (String str2 : list) {
            pulsarTopicInfo.setTopicName(str2);
            createSubscription(pulsarClusterInfo, pulsarTopicInfo.getPulsarTenant() + "/" + pulsarTopicInfo.getNamespace() + "/" + str2, pulsarTopicInfo.getQueueModule(), str);
        }
        LOGGER.info("success to create subscription={} for multiple topics={}", str, list);
    }

    private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return PulsarUtils.getTenants(this.restTemplate, pulsarClusterInfo).contains(str);
    }

    private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String str, String str2) throws Exception {
        return PulsarUtils.getNamespaces(this.restTemplate, pulsarClusterInfo, str).contains(str2);
    }

    public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String str, String str2, String str3, boolean z) {
        int lastIndexOf;
        if (StringUtils.isBlank(str3)) {
            return true;
        }
        boolean z2 = false;
        try {
            Iterator<String> it = (z ? PulsarUtils.getPartitionedTopics(this.restTemplate, pulsarClusterInfo, str, str2) : PulsarUtils.getTopics(this.restTemplate, pulsarClusterInfo, str, str2)).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                String next = it.next();
                String substring = next.substring(next.lastIndexOf("/") + 1);
                if (!z && (lastIndexOf = substring.lastIndexOf("-partition-")) > 0) {
                    substring = substring.substring(0, lastIndexOf);
                }
                if (str3.equals(substring)) {
                    z2 = true;
                    break;
                }
            }
        } catch (Exception e) {
            LOGGER.error("check if the pulsar topic={} exists error, begin retry", str3, e);
            int i = 0;
            while (!z2) {
                try {
                    i++;
                    if (i > RETRY_TIMES) {
                        break;
                    }
                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", str3, Integer.valueOf(i));
                    Thread.sleep(5L);
                    Iterator<String> it2 = PulsarUtils.getPartitionedTopics(this.restTemplate, pulsarClusterInfo, str, str2).iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            String next2 = it2.next();
                            if (str3.equals(next2.substring(next2.lastIndexOf("/") + 1))) {
                                z2 = true;
                                break;
                            }
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.error("after retry, check if the pulsar topic={} exists still error", str3, e);
                }
            }
        }
        return z2;
    }

    private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String str, String str2, boolean z) {
        int i = 0;
        while (true) {
            i++;
            if (i > RETRY_TIMES) {
                return false;
            }
            try {
                LOGGER.info("check whether the subscription exists for topic={}, try count={}", str, Integer.valueOf(i));
                Thread.sleep(5L);
            } catch (Exception e) {
                LOGGER.error("check if the subscription exists for topic={} error, continue retry", str, e);
                if (i == RETRY_TIMES) {
                    LOGGER.error("after {} times retry, still check subscription exception for topic {}", Integer.valueOf(i), str);
                    throw new BusinessException("check if the subscription exists error: " + e.getMessage());
                }
            }
            if (z) {
                if (PulsarUtils.lookupPartitionedTopic(this.restTemplate, pulsarClusterInfo, str).isEmpty()) {
                    LOGGER.error("result of lookups topic={} is empty, continue retry", str);
                }
            } else if (StringUtils.isBlank(PulsarUtils.lookupTopic(this.restTemplate, pulsarClusterInfo, str))) {
                LOGGER.error("result of lookups topic={} is empty, continue retry", str);
            }
            return PulsarUtils.getSubscriptions(this.restTemplate, pulsarClusterInfo, str).contains(str2);
        }
    }

    public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String str, String str2, Integer num, InlongStreamInfo inlongStreamInfo, boolean z) {
        LOGGER.info("begin to query message for topic {}, subName={}", str, str2);
        ArrayList arrayList = new ArrayList();
        int partitionCount = getPartitionCount(pulsarClusterInfo, str);
        for (int i = 0; i < num.intValue(); i++) {
            arrayList.addAll(queryMessageFromPulsar(buildTopicNameOfPartition(str, i % partitionCount, z), pulsarClusterInfo, i, inlongStreamInfo, i / partitionCount));
        }
        LOGGER.info("success query message by subs={} for topic={}", str2, str);
        return arrayList;
    }

    private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String str) {
        try {
            PulsarTopicMetadata partitionedTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(this.restTemplate, pulsarClusterInfo, str);
            if (partitionedTopicMetadata.getPartitions() > 0) {
                return partitionedTopicMetadata.getPartitions();
            }
            return 1;
        } catch (Exception e) {
            LOGGER.error("get pulsar partition error ", e);
            throw new BusinessException("get pulsar partition error " + e.getMessage());
        }
    }

    private List<BriefMQMessage> queryMessageFromPulsar(String str, PulsarClusterInfo pulsarClusterInfo, int i, InlongStreamInfo inlongStreamInfo, int i2) {
        ArrayList arrayList = new ArrayList();
        try {
            PulsarMessageInfo messageFromHttpResponse = PulsarUtils.getMessageFromHttpResponse(PulsarUtils.examineMessage(this.restTemplate, pulsarClusterInfo, str, "latest", i2), str);
            Map<String, String> properties = messageFromHttpResponse.getProperties();
            if (properties == null) {
                properties = new HashMap();
            }
            MessageWrapType forType = MessageWrapType.forType(inlongStreamInfo.getWrapType());
            if (properties.get("msgEnType") != null) {
                forType = MessageWrapType.valueOf(Integer.parseInt(properties.get("msgEnType")));
            }
            arrayList.addAll(this.deserializeOperatorFactory.getInstance(forType).decodeMsg(inlongStreamInfo, messageFromHttpResponse.getBody(), properties, i));
        } catch (Exception e) {
            LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}", new Object[]{inlongStreamInfo.getInlongGroupId(), inlongStreamInfo.getInlongStreamId(), e});
        }
        return arrayList;
    }

    public void resetCursor(PulsarClusterInfo pulsarClusterInfo, String str, String str2, Long l) {
        try {
            PulsarUtils.resetCursor(this.restTemplate, pulsarClusterInfo, str, str2, l);
        } catch (Exception e) {
            LOGGER.error("failed reset cursor consumer:", e);
            throw new BusinessException("failed reset cursor consumer:" + e.getMessage());
        }
    }

    private String buildTopicNameOfPartition(String str, int i, boolean z) {
        return z ? str : str + "-partition-" + i;
    }
}
