package org.apache.inlong.manager.service.resource.queue.pulsar;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageMetadata;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.class */
public class PulsarUtils {
    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
    private static final Logger log = LoggerFactory.getLogger(PulsarUtils.class);
    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.systemDefault());
    private static final Gson GSON = new GsonBuilder().create();

    private PulsarUtils() {
    }

    private static HttpHeaders getHttpHeaders(String str) {
        HttpHeaders httpHeaders = new HttpHeaders();
        if (StringUtils.isNotEmpty(str)) {
            httpHeaders.add("Authorization", "Bearer " + str);
        }
        return httpHeaders;
    }

    public static List<String> getClusters(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls(QUERY_CLUSTERS_PATH), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static List<String> getBrokers(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo) throws Exception {
        List<String> clusters = getClusters(restTemplate, pulsarClusterInfo);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = clusters.iterator();
        while (it.hasNext()) {
            arrayList.addAll((Collection) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/brokers/" + it.next()), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class));
        }
        return arrayList;
    }

    public static List<String> getTenants(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls(QUERY_TENANTS_PATH), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static List<String> getNamespaces(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/namespaces/" + str), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static void createTenant(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, PulsarTenantInfo pulsarTenantInfo) throws Exception {
        HttpHeaders httpHeaders = getHttpHeaders(pulsarClusterInfo.getToken());
        httpHeaders.setContentType(MediaType.parseMediaType("application/json; charset=UTF-8"));
        httpHeaders.add("Accept", MediaType.APPLICATION_JSON.toString());
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/tenants/" + str), HttpMethod.PUT, GSON.toJson(pulsarTenantInfo), httpHeaders);
    }

    public static void createNamespace(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2, PulsarNamespacePolicies pulsarNamespacePolicies) throws Exception {
        HttpHeaders httpHeaders = getHttpHeaders(pulsarClusterInfo.getToken());
        httpHeaders.setContentType(MediaType.parseMediaType("application/json; charset=UTF-8"));
        httpHeaders.add("Accept", MediaType.APPLICATION_JSON.toString());
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/namespaces/" + str + "/" + str2), HttpMethod.PUT, GSON.toJson(pulsarNamespacePolicies).replaceAll("messageTtlInSeconds", "message_ttl_in_seconds").replaceAll("retentionPolicies", "retention_policies"), httpHeaders);
    }

    public static List<String> getTopics(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/" + str2), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static List<String> getPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/" + str2 + "/partitioned"), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static void createNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str), HttpMethod.PUT, (Object) null, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static void createPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, Integer num) throws Exception {
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/partitions"), HttpMethod.PUT, num.toString(), getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static JsonObject getInternalStatsPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return (JsonObject) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/partitioned-internalStats"), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), JsonObject.class);
    }

    public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return (PulsarTopicMetadata) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/partitions"), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), PulsarTopicMetadata.class);
    }

    public static void deleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str), HttpMethod.DELETE, (Object) null, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static void forceDeleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("force", true);
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str), HttpMethod.DELETE, hashMap, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static void deletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/partitions"), HttpMethod.DELETE, (Object) null, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static void forceDeletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("force", true);
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/partitions"), HttpMethod.DELETE, hashMap, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    public static void deleteTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, boolean z) throws Exception {
        if (z) {
            deletePartitionedTopic(restTemplate, pulsarClusterInfo, str);
        } else {
            deleteNonPartitionedTopic(restTemplate, pulsarClusterInfo, str);
        }
    }

    public static void forceDeleteTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, boolean z) throws Exception {
        if (z) {
            forceDeletePartitionedTopic(restTemplate, pulsarClusterInfo, str);
        } else {
            forceDeleteNonPartitionedTopic(restTemplate, pulsarClusterInfo, str);
        }
    }

    public static String lookupTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return ((PulsarLookupTopicInfo) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/lookup/v2/topic/persistent/" + str), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), PulsarLookupTopicInfo.class)).getBrokerUrl();
    }

    public static Map<String, String> lookupPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        PulsarTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(restTemplate, pulsarClusterInfo, str);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i = 0; i < partitionedTopicMetadata.getPartitions(); i++) {
            String str2 = str + "-partition-" + i;
            linkedHashMap.put(str2, ((PulsarLookupTopicInfo) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/lookup/v2/topic/persistent/" + str2), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), PulsarLookupTopicInfo.class)).getBrokerUrl());
        }
        return linkedHashMap;
    }

    public static List<String> getSubscriptions(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str) throws Exception {
        return (List) HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/subscriptions"), HttpMethod.GET, (String) null, getHttpHeaders(pulsarClusterInfo.getToken()), ArrayList.class);
    }

    public static void createSubscription(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2) throws Exception {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("entryId", Long.MAX_VALUE);
        jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
        jsonObject.addProperty("partitionIndex", -1);
        HttpHeaders httpHeaders = getHttpHeaders(pulsarClusterInfo.getToken());
        httpHeaders.setContentType(MediaType.parseMediaType("application/json; charset=UTF-8"));
        httpHeaders.add("Accept", MediaType.APPLICATION_JSON.toString());
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/subscription/" + str2), HttpMethod.PUT, jsonObject.toString(), httpHeaders);
    }

    public static ResponseEntity<byte[]> examineMessage(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2, int i) throws Exception {
        String[] split = pulsarClusterInfo.getAdminUrl().replace("http://", "").split(",");
        for (int i2 = 0; i2 < split.length; i2++) {
            try {
                StringBuilder append = new StringBuilder().append("http://" + split[i2]).append(QUERY_PERSISTENT_PATH).append("/").append(str).append("/examinemessage").append("?initialPosition=").append(str2).append("&messagePosition=").append(i);
                ResponseEntity<byte[]> exchange = restTemplate.exchange(append.toString(), HttpMethod.GET, new HttpEntity(getHttpHeaders(pulsarClusterInfo.getToken())), byte[].class, new Object[0]);
                if (!exchange.getStatusCode().is2xxSuccessful()) {
                    log.error("request error for {}, status code {}, body {}", new Object[]{append.toString(), exchange.getStatusCode(), exchange.getBody()});
                }
                return exchange;
            } catch (Exception e) {
                log.error("examine message for topic partition={} error, begin retry", str, e);
                if (i2 >= split.length - 1) {
                    log.error("after retry, examine message for topic partition={} still error", str, e);
                    throw e;
                }
            }
        }
        throw new Exception(String.format("examine message failed for topic partition=%s", str));
    }

    public static PulsarMessageInfo getMessageFromHttpResponse(ResponseEntity<byte[]> responseEntity, String str) throws Exception {
        List<PulsarMessageInfo> messagesFromHttpResponse = getMessagesFromHttpResponse(responseEntity, str);
        if (messagesFromHttpResponse.size() > 0) {
            return messagesFromHttpResponse.get(0);
        }
        return null;
    }

    public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> responseEntity, String str) throws Exception {
        PulsarBrokerEntryMetadata pulsarBrokerEntryMetadata;
        HttpHeaders headers = responseEntity.getHeaders();
        String first = headers.getFirst("X-Pulsar-Message-ID");
        String first2 = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
        String first3 = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
        if (first2 == null && first3 == null) {
            pulsarBrokerEntryMetadata = null;
        } else {
            pulsarBrokerEntryMetadata = new PulsarBrokerEntryMetadata();
            if (first2 != null) {
                pulsarBrokerEntryMetadata.setBrokerTimestamp(parse(first2.toString()));
            }
            if (first3 != null) {
                pulsarBrokerEntryMetadata.setIndex(Long.parseLong(first3));
            }
        }
        PulsarMessageMetadata pulsarMessageMetadata = new PulsarMessageMetadata();
        TreeMap newTreeMap = Maps.newTreeMap();
        String first4 = headers.getFirst("X-Pulsar-publish-time");
        if (first4 != null) {
            pulsarMessageMetadata.setPublishTime(parse(first4.toString()));
        }
        String first5 = headers.getFirst("X-Pulsar-event-time");
        if (first5 != null) {
            pulsarMessageMetadata.setEventTime(parse(first5.toString()));
        }
        String first6 = headers.getFirst("X-Pulsar-deliver-at-time");
        if (first6 != null) {
            pulsarMessageMetadata.setDeliverAtTime(parse(first6.toString()));
        }
        String first7 = headers.getFirst("X-Pulsar-null-value");
        if (first7 != null) {
            pulsarMessageMetadata.setNullValue(Boolean.parseBoolean(first7.toString()));
        }
        String first8 = headers.getFirst("X-Pulsar-producer-name");
        if (first8 != null) {
            pulsarMessageMetadata.setProducerName(first8.toString());
        }
        String first9 = headers.getFirst("X-Pulsar-sequence-id");
        if (first9 != null) {
            pulsarMessageMetadata.setSequenceId(Long.parseLong(first9.toString()));
        }
        String first10 = headers.getFirst("X-Pulsar-replicated-from");
        if (first10 != null) {
            pulsarMessageMetadata.setReplicatedFrom(first10.toString());
        }
        String first11 = headers.getFirst("X-Pulsar-partition-key");
        if (first11 != null) {
            pulsarMessageMetadata.setPartitionKey(first11.toString());
        }
        String first12 = headers.getFirst("X-Pulsar-compression");
        if (first12 != null) {
            pulsarMessageMetadata.setCompression(first12.toString());
        }
        String first13 = headers.getFirst("X-Pulsar-uncompressed-size");
        if (first13 != null) {
            pulsarMessageMetadata.setUncompressedSize(Integer.parseInt(first13.toString()));
        }
        String first14 = headers.getFirst("X-Pulsar-encryption-algo");
        if (first14 != null) {
            pulsarMessageMetadata.setEncryptionAlgo(first14.toString());
        }
        String first15 = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
        if (first15 != null) {
            pulsarMessageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(first15.toString()));
        }
        String first16 = headers.getFirst("X-Pulsar-marker-type");
        if (first16 != null) {
            pulsarMessageMetadata.setMarkerType(Integer.parseInt(first16.toString()));
        }
        String first17 = headers.getFirst("X-Pulsar-txnid-least-bits");
        if (first17 != null) {
            pulsarMessageMetadata.setTxnidLeastBits(Long.parseLong(first17.toString()));
        }
        String first18 = headers.getFirst("X-Pulsar-txnid-most-bits");
        if (first18 != null) {
            pulsarMessageMetadata.setTxnidMostBits(Long.parseLong(first18.toString()));
        }
        String first19 = headers.getFirst("X-Pulsar-highest-sequence-id");
        if (first19 != null) {
            pulsarMessageMetadata.setHighestSequenceId(Long.parseLong(first19.toString()));
        }
        String first20 = headers.getFirst("X-Pulsar-uuid");
        if (first20 != null) {
            pulsarMessageMetadata.setUuid(first20.toString());
        }
        String first21 = headers.getFirst("X-Pulsar-num-chunks-from-msg");
        if (first21 != null) {
            pulsarMessageMetadata.setNumChunksFromMsg(Integer.parseInt(first21.toString()));
        }
        String first22 = headers.getFirst("X-Pulsar-total-chunk-msg-size");
        if (first22 != null) {
            pulsarMessageMetadata.setTotalChunkMsgSize(Integer.parseInt(first22.toString()));
        }
        String first23 = headers.getFirst("X-Pulsar-chunk-id");
        if (first23 != null) {
            pulsarMessageMetadata.setChunkId(Integer.parseInt(first23.toString()));
        }
        String first24 = headers.getFirst("X-Pulsar-null-partition-key");
        if (first24 != null) {
            pulsarMessageMetadata.setNullPartitionKey(Boolean.parseBoolean(first24.toString()));
        }
        String first25 = headers.getFirst("X-Pulsar-Base64-encryption-param");
        if (first25 != null) {
            pulsarMessageMetadata.setEncryptionParam(Base64.getDecoder().decode(first25.toString()));
        }
        String first26 = headers.getFirst("X-Pulsar-Base64-ordering-key");
        if (first26 != null) {
            pulsarMessageMetadata.setOrderingKey(Base64.getDecoder().decode(first26.toString()));
        }
        String first27 = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
        if (first27 != null) {
            pulsarMessageMetadata.setSchemaVersion(Base64.getDecoder().decode(first27.toString()));
        }
        String first28 = headers.getFirst("X-Pulsar-Base64-encryption-param");
        if (first28 != null) {
            pulsarMessageMetadata.setEncryptionParam(Base64.getDecoder().decode(first28.toString()));
        }
        List list = headers.get("X-Pulsar-replicated-to");
        if (ObjectUtils.isNotEmpty(list)) {
            if (ObjectUtils.isEmpty(pulsarMessageMetadata.getReplicateTos())) {
                pulsarMessageMetadata.setReplicateTos(Lists.newArrayList(list));
            } else {
                pulsarMessageMetadata.getReplicateTos().addAll(list);
            }
        }
        String first29 = headers.getFirst("X-Pulsar-batch-size");
        if (first29 != null) {
            newTreeMap.put("X-Pulsar-batch-size", first29);
        }
        for (Map.Entry entry : headers.entrySet()) {
            if (((String) entry.getKey()).contains("X-Pulsar-PROPERTY-")) {
                newTreeMap.put(((String) entry.getKey()).substring("X-Pulsar-PROPERTY-".length()), (String) ((List) entry.getValue()).get(0));
            }
        }
        String first30 = headers.getFirst("X-Pulsar-num-batch-message");
        if (first30 != null) {
            newTreeMap.put("X-Pulsar-num-batch-message", first30);
        }
        String first31 = headers.getFirst("X-Pulsar-Is-Encrypted");
        if (!(first31 != null ? Boolean.parseBoolean(first31.toString()) : false) && headers.get("X-Pulsar-num-batch-message") != null) {
            return getIndividualMsgsFromBatch(str, first, (byte[]) responseEntity.getBody(), newTreeMap, pulsarMessageMetadata, pulsarBrokerEntryMetadata);
        }
        PulsarMessageInfo pulsarMessageInfo = new PulsarMessageInfo();
        pulsarMessageInfo.setTopic(str);
        pulsarMessageInfo.setMessageId(first);
        pulsarMessageInfo.setProperties(pulsarMessageMetadata.getProperties());
        pulsarMessageInfo.setBody((byte[]) responseEntity.getBody());
        pulsarMessageInfo.setPulsarMessageMetadata(pulsarMessageMetadata);
        if (pulsarBrokerEntryMetadata != null) {
            pulsarMessageInfo.setPulsarBrokerEntryMetadata(pulsarBrokerEntryMetadata);
        }
        return Collections.singletonList(pulsarMessageInfo);
    }

    private static long parse(String str) throws DateTimeParseException {
        return Instant.from(DATE_FORMAT.parse(str)).toEpochMilli();
    }

    private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String str, String str2, byte[] bArr, Map<String, String> map, PulsarMessageMetadata pulsarMessageMetadata, PulsarBrokerEntryMetadata pulsarBrokerEntryMetadata) {
        ArrayList arrayList = new ArrayList();
        int parseInt = Integer.parseInt(map.get("X-Pulsar-num-batch-message"));
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        for (int i = 0; i < parseInt; i++) {
            PulsarMessageMetadata pulsarMessageMetadata2 = new PulsarMessageMetadata();
            pulsarMessageMetadata2.setProperties(map);
            ByteBuffer deSerializeSingleMessageInBatch = deSerializeSingleMessageInBatch(wrap, pulsarMessageMetadata2, i, parseInt);
            PulsarMessageInfo pulsarMessageInfo = new PulsarMessageInfo();
            pulsarMessageInfo.setTopic(str);
            pulsarMessageInfo.setMessageId(str2 + ":" + i);
            pulsarMessageInfo.setProperties(pulsarMessageMetadata2.getProperties());
            pulsarMessageInfo.setPulsarMessageMetadata(pulsarMessageMetadata);
            pulsarMessageInfo.setBody(deSerializeSingleMessageInBatch.array());
            if (pulsarBrokerEntryMetadata != null) {
                pulsarMessageInfo.setPulsarBrokerEntryMetadata(pulsarBrokerEntryMetadata);
            }
            arrayList.add(pulsarMessageInfo);
        }
        wrap.clear();
        return arrayList;
    }

    public static void resetCursor(RestTemplate restTemplate, PulsarClusterInfo pulsarClusterInfo, String str, String str2, Long l) throws Exception {
        HttpUtils.request(restTemplate, pulsarClusterInfo.getAdminUrls("/admin/v2/persistent/" + str + "/subscription/" + str2 + "/resetcursor/" + l), HttpMethod.POST, (Object) null, getHttpHeaders(pulsarClusterInfo.getToken()));
    }

    private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer byteBuffer, PulsarMessageMetadata pulsarMessageMetadata, int i, int i2) {
        metaDataParseFrom(pulsarMessageMetadata, byteBuffer, byteBuffer.getInt());
        int payloadSize = pulsarMessageMetadata.getPayloadSize();
        int position = byteBuffer.position();
        byte[] bArr = new byte[payloadSize];
        byteBuffer.get(bArr);
        if (i < i2) {
            byteBuffer.position(position + payloadSize);
        }
        return ByteBuffer.wrap(bArr);
    }

    private static void metaDataParseFrom(PulsarMessageMetadata pulsarMessageMetadata, ByteBuffer byteBuffer, int i) {
        int position = i + byteBuffer.position();
        while (byteBuffer.position() < position) {
            int readVarInt = readVarInt(byteBuffer);
            switch (readVarInt) {
                case 10:
                    parseFrom(pulsarMessageMetadata, byteBuffer, readVarInt(byteBuffer));
                    break;
                case 18:
                    byte[] bArr = new byte[readVarInt(byteBuffer)];
                    byteBuffer.get(bArr);
                    pulsarMessageMetadata.setPartitionKey(new String(bArr));
                    break;
                case 24:
                    pulsarMessageMetadata.setPayloadSize(readVarInt(byteBuffer));
                    break;
                case 32:
                    pulsarMessageMetadata.setCompactedOut(readVarInt(byteBuffer) == 1);
                    break;
                case 40:
                    pulsarMessageMetadata.setEventTime(readVarInt64(byteBuffer));
                    break;
                case 48:
                    pulsarMessageMetadata.setPartitionKeyB64Encoded(readVarInt(byteBuffer) == 1);
                    break;
                case 58:
                    pulsarMessageMetadata.setOrderingKey(new byte[readVarInt(byteBuffer)]);
                    break;
                case 64:
                    pulsarMessageMetadata.setSequenceId(readVarInt64(byteBuffer));
                    break;
                case 72:
                    pulsarMessageMetadata.setNullValue(readVarInt(byteBuffer) == 1);
                    break;
                case 80:
                    pulsarMessageMetadata.setNullPartitionKey(readVarInt(byteBuffer) == 1);
                    break;
                default:
                    skipUnknownField(readVarInt, byteBuffer);
                    break;
            }
        }
    }

    private static int readVarInt(ByteBuffer byteBuffer) {
        int i;
        byte b = byteBuffer.get();
        if (b >= 0) {
            return b;
        }
        int i2 = b & Byte.MAX_VALUE;
        byte b2 = byteBuffer.get();
        if (b2 >= 0) {
            i = i2 | (b2 << 7);
        } else {
            int i3 = i2 | ((b2 & Byte.MAX_VALUE) << 7);
            byte b3 = byteBuffer.get();
            if (b3 >= 0) {
                i = i3 | (b3 << 14);
            } else {
                int i4 = i3 | ((b3 & Byte.MAX_VALUE) << 14);
                byte b4 = byteBuffer.get();
                if (b4 >= 0) {
                    i = i4 | (b4 << 21);
                } else {
                    byte b5 = byteBuffer.get();
                    i = i4 | ((b4 & Byte.MAX_VALUE) << 21) | (b5 << 28);
                    if (b5 < 0) {
                        for (int i5 = 0; i5 < 5; i5++) {
                            if (byteBuffer.get() >= 0) {
                                return i;
                            }
                        }
                        throw new IllegalArgumentException("Encountered a malformed varint.");
                    }
                }
            }
        }
        return i;
    }

    private static long readVarInt64(ByteBuffer byteBuffer) {
        long j = 0;
        for (int i = 0; i < 64; i += 7) {
            j |= (r0 & Byte.MAX_VALUE) << i;
            if ((byteBuffer.get() & 128) == 0) {
                return j;
            }
        }
        throw new IllegalArgumentException("Encountered a malformed varint.");
    }

    private static int getTagType(int i) {
        return i & 7;
    }

    private static void skipUnknownField(int i, ByteBuffer byteBuffer) {
        int tagType = getTagType(i);
        switch (tagType) {
            case 0:
                readVarInt(byteBuffer);
                return;
            case 1:
                byteBuffer.get(new byte[8]);
                return;
            case 2:
                byteBuffer.get(new byte[readVarInt(byteBuffer)]);
                return;
            case 3:
            case 4:
            default:
                throw new IllegalArgumentException("Invalid unknonwn tag type: " + tagType);
            case 5:
                byteBuffer.get(new byte[4]);
                return;
        }
    }

    private static void parseFrom(PulsarMessageMetadata pulsarMessageMetadata, ByteBuffer byteBuffer, int i) {
        if (ObjectUtils.isEmpty(pulsarMessageMetadata.getProperties())) {
            pulsarMessageMetadata.setProperties(new HashMap());
        }
        Map properties = pulsarMessageMetadata.getProperties();
        int position = byteBuffer.position() + i;
        String str = null;
        String str2 = null;
        while (byteBuffer.position() < position) {
            int readVarInt = readVarInt(byteBuffer);
            if (StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(str2)) {
                properties.put(str, str2);
                str = null;
                str2 = null;
            }
            switch (readVarInt) {
                case 10:
                    byte[] bArr = new byte[readVarInt(byteBuffer)];
                    byteBuffer.get(bArr);
                    str = new String(bArr);
                    break;
                case 18:
                    byte[] bArr2 = new byte[readVarInt(byteBuffer)];
                    byteBuffer.get(bArr2);
                    str2 = new String(bArr2);
                    break;
                default:
                    skipUnknownField(readVarInt, byteBuffer);
                    break;
            }
        }
        if (StringUtils.isNotEmpty(str) && StringUtils.isNotEmpty(str2)) {
            properties.put(str, str2);
        }
    }
}
