package org.apache.pulsar.common.naming;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/common/naming/NamespaceBundleFactory.class */
public class NamespaceBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
    private final HashFunction hashFunc;
    private final PulsarService pulsar;
    private final MetadataCache<Policies> policiesCache;
    private final Duration maxRetryDuration = Duration.ofSeconds(10);
    private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache = Caffeine.newBuilder().recordStats().buildAsync(this::loadBundles);

    public NamespaceBundleFactory(PulsarService pulsarService, HashFunction hashFunction) {
        this.hashFunc = hashFunction;
        CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
        pulsarService.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        this.pulsar = pulsarService;
        this.policiesCache = pulsarService.getConfigurationMetadataStore().getMetadataCache(Policies.class);
    }

    private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespaceName, Executor executor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Loading cache with bundles for {}", namespaceName);
        }
        if (this.pulsar == null) {
            return CompletableFuture.completedFuture(getBundles(namespaceName, Optional.empty()));
        }
        CompletableFuture<NamespaceBundles> completableFuture = new CompletableFuture<>();
        doLoadBundles(namespaceName, completableFuture, createBackoff(), System.nanoTime() + this.maxRetryDuration.toNanos());
        return completableFuture;
    }

    private void doLoadBundles(NamespaceName namespaceName, CompletableFuture<NamespaceBundles> completableFuture, Backoff backoff, long j) {
        this.pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespaceName).thenAccept(optional -> {
            if (!optional.isPresent()) {
                copyToLocalPolicies(namespaceName).thenAccept(namespaceBundles -> {
                    completableFuture.complete(namespaceBundles);
                }).exceptionally(th -> {
                    handleLoadBundlesRetry(namespaceName, completableFuture, backoff, j, th);
                    return null;
                });
                return;
            }
            try {
                completableFuture.complete(readBundles(namespaceName, (LocalPolicies) ((CacheGetResult) optional.get()).getValue(), ((CacheGetResult) optional.get()).getStat().getVersion()));
            } catch (IOException e) {
                handleLoadBundlesRetry(namespaceName, completableFuture, backoff, j, e);
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    private void handleLoadBundlesRetry(NamespaceName namespaceName, CompletableFuture<NamespaceBundles> completableFuture, Backoff backoff, long j, Throwable th) {
        if ((th instanceof Error) || System.nanoTime() > j) {
            completableFuture.completeExceptionally(th);
            return;
        }
        LOG.warn("Error loading bundle for {}. Retrying exception", namespaceName, th);
        this.pulsar.getExecutor().schedule(() -> {
            doLoadBundles(namespaceName, completableFuture, backoff, j);
        }, backoff.next(), TimeUnit.MILLISECONDS);
    }

    private static Backoff createBackoff() {
        return new Backoff(100L, TimeUnit.MILLISECONDS, 5L, TimeUnit.SECONDS, 0L, TimeUnit.MILLISECONDS);
    }

    private NamespaceBundles readBundles(NamespaceName namespaceName, LocalPolicies localPolicies, long j) throws IOException {
        NamespaceBundles bundles = getBundles(namespaceName, Optional.of(Pair.of(localPolicies, Long.valueOf(j))));
        if (LOG.isDebugEnabled()) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = namespaceName;
            objArr[1] = localPolicies.bundles.getBoundaries() != null ? localPolicies.bundles : "null";
            objArr[2] = bundles.getVersion();
            logger.debug("[{}] Get bundles from getLocalZkCacheService: bundles: {}, version: {}", objArr);
        }
        return bundles;
    }

    private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespaceName) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenCompose(optional -> {
            if (!optional.isPresent()) {
                return CompletableFuture.completedFuture(getBundles(namespaceName, Optional.empty()));
            }
            LocalPolicies localPolicies = new LocalPolicies(((Policies) optional.get()).bundles, (BookieAffinityGroupData) null, (String) null);
            return this.pulsar.getPulsarResources().getLocalPolicies().createLocalPoliciesAsync(namespaceName, localPolicies).thenApply(r9 -> {
                return getBundles(namespaceName, Optional.of(Pair.of(localPolicies, 0L)));
            });
        });
    }

    private void handleMetadataStoreNotification(Notification notification) {
        if (LocalPoliciesResources.isLocalPoliciesPath(notification.getPath())) {
            try {
                Optional ifValid = NamespaceName.getIfValid(getNamespaceFromPoliciesPath(notification.getPath()));
                if (ifValid.isPresent()) {
                    LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", ifValid);
                    this.bundlesCache.synchronous().invalidate(ifValid.get());
                }
            } catch (Exception e) {
                LOG.error("Failed to update the policy change for path {}", notification.getPath(), e);
            }
        }
    }

    public void invalidateBundleCache(NamespaceName namespaceName) {
        this.bundlesCache.synchronous().invalidate(namespaceName);
    }

    public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName namespaceName) {
        return this.bundlesCache.get(namespaceName);
    }

    public NamespaceBundle getBundleWithHighestTopics(NamespaceName namespaceName) {
        try {
            return getBundleWithHighestTopicsAsync(namespaceName).get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.info("failed to derive bundle for {}", namespaceName, e);
            throw new IllegalStateException(e instanceof ExecutionException ? e.getCause() : e);
        }
    }

    public CompletableFuture<NamespaceBundle> getBundleWithHighestTopicsAsync(NamespaceName namespaceName) {
        return this.pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName).thenCompose(list -> {
            return this.bundlesCache.get(namespaceName).handle((namespaceBundles, th) -> {
                HashMap hashMap = new HashMap();
                NamespaceBundle namespaceBundle = null;
                int i = 0;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    NamespaceBundle findBundle = namespaceBundles.findBundle(TopicName.get((String) it.next()));
                    String bundleRange = findBundle.getBundleRange();
                    int intValue = ((Integer) hashMap.getOrDefault(bundleRange, 0)).intValue() + 1;
                    hashMap.put(bundleRange, Integer.valueOf(intValue));
                    if (intValue > i) {
                        i = intValue;
                        namespaceBundle = findBundle;
                    }
                }
                return namespaceBundle;
            });
        });
    }

    public NamespaceBundle getBundle(TopicName topicName) {
        NamespaceBundles namespaceBundles = (NamespaceBundles) this.bundlesCache.synchronous().get(topicName.getNamespaceObject());
        if (namespaceBundles != null) {
            return namespaceBundles.findBundle(topicName);
        }
        return null;
    }

    public NamespaceBundle getBundleWithHighestThroughput(NamespaceName namespaceName) {
        LoadManager loadManager = this.pulsar.getLoadManager().get();
        if (!(loadManager instanceof ModularLoadManagerWrapper)) {
            return getBundleWithHighestTopics(namespaceName);
        }
        double d = -1.0d;
        NamespaceBundle namespaceBundle = null;
        for (NamespaceBundle namespaceBundle2 : getBundles(namespaceName).getBundles()) {
            BundleData bundleDataOrDefault = ((ModularLoadManagerWrapper) loadManager).getLoadManager().getBundleDataOrDefault(namespaceBundle2.toString());
            if (bundleDataOrDefault.getTopics() > 0 && bundleDataOrDefault.getLongTermData().totalMsgThroughput() > d) {
                d = bundleDataOrDefault.getLongTermData().totalMsgThroughput();
                namespaceBundle = namespaceBundle2;
            }
        }
        return namespaceBundle;
    }

    public NamespaceBundles getBundles(NamespaceName namespaceName) {
        return (NamespaceBundles) this.bundlesCache.synchronous().get(namespaceName);
    }

    public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName namespaceName) {
        return Optional.ofNullable(this.bundlesCache.synchronous().getIfPresent(namespaceName));
    }

    public NamespaceBundle getBundle(NamespaceName namespaceName, Range<Long> range) {
        return new NamespaceBundle(namespaceName, range, this);
    }

    public NamespaceBundle getBundle(String str, String str2) {
        Preconditions.checkArgument(str2.contains(ClusterReplicationMetrics.SEPARATOR), "Invalid bundle range");
        String[] split = str2.split(ClusterReplicationMetrics.SEPARATOR);
        Long decode = Long.decode(split[0]);
        Long decode2 = Long.decode(split[1]);
        return getBundle(NamespaceName.get(str), Range.range(decode, BoundType.CLOSED, decode2, decode2.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN));
    }

    public NamespaceBundle getFullBundle(NamespaceName namespaceName) throws Exception {
        return ((NamespaceBundles) this.bundlesCache.synchronous().get(namespaceName)).getFullBundle();
    }

    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName namespaceName) {
        return this.bundlesCache.get(namespaceName).thenApply((v0) -> {
            return v0.getFullBundle();
        });
    }

    public long getLongHashCode(String str) {
        return this.hashFunc.hashString(str, Charsets.UTF_8).padToLong();
    }

    public NamespaceBundles getBundles(NamespaceName namespaceName, BundlesData bundlesData) {
        return new NamespaceBundles(namespaceName, this, (Optional<Pair<LocalPolicies, Long>>) Optional.empty(), NamespaceBundles.getPartitions(bundlesData));
    }

    private NamespaceBundles getBundles(NamespaceName namespaceName, Optional<Pair<LocalPolicies, Long>> optional) {
        return new NamespaceBundles(namespaceName, this, optional);
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(NamespaceBundle namespaceBundle, int i, List<Long> list) {
        Preconditions.checkArgument(canSplitBundle(namespaceBundle), "%s bundle can't be split further since range not larger than 1", namespaceBundle);
        if (list != null && list.size() > 0) {
            Collections.sort(list);
            Preconditions.checkArgument(list.get(0).longValue() > namespaceBundle.getLowerEndpoint().longValue() && list.get(list.size() - 1).longValue() < namespaceBundle.getUpperEndpoint().longValue(), "The given fixed keys must between the key range of the %s bundle", namespaceBundle);
            i = list.size() + 1;
        }
        Preconditions.checkNotNull(namespaceBundle, "can't split null bundle");
        Preconditions.checkNotNull(namespaceBundle.getNamespaceObject(), "namespace must be present");
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        int i2 = i;
        return this.bundlesCache.get(namespaceObject).thenApply(namespaceBundles -> {
            int length = namespaceBundles.partitions.length - 1;
            long[] jArr = new long[namespaceBundles.partitions.length + (i2 - 1)];
            int i3 = 0;
            int i4 = -1;
            Range<Long> keyRange = namespaceBundle.getKeyRange();
            for (int i5 = 0; i5 < length; i5++) {
                if (namespaceBundles.partitions[i5] == ((Long) keyRange.lowerEndpoint()).longValue() && ((Long) keyRange.upperEndpoint()).longValue() == namespaceBundles.partitions[i5 + 1]) {
                    i4 = i5;
                    long j = namespaceBundles.partitions[i5];
                    int i6 = i3;
                    i3++;
                    jArr[i6] = j;
                    if (list == null || list.size() == 0) {
                        long j2 = (namespaceBundles.partitions[i5 + 1] - j) / i2;
                        long j3 = j + j2;
                        for (int i7 = 0; i7 < i2 - 1; i7++) {
                            int i8 = i3;
                            i3++;
                            jArr[i8] = j3;
                            j3 += j2;
                        }
                    } else {
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            int i9 = i3;
                            i3++;
                            jArr[i9] = ((Long) it.next()).longValue();
                        }
                    }
                } else {
                    int i10 = i3;
                    i3++;
                    jArr[i10] = namespaceBundles.partitions[i5];
                }
            }
            jArr[i3] = namespaceBundles.partitions[length];
            if (i4 == -1) {
                return null;
            }
            NamespaceBundles namespaceBundles = new NamespaceBundles(namespaceObject, this, namespaceBundles.getLocalPolicies(), jArr);
            return new ImmutablePair(namespaceBundles, namespaceBundles.getBundles().subList(i4, i4 + i2));
        });
    }

    public boolean canSplitBundle(NamespaceBundle namespaceBundle) {
        Range<Long> keyRange = namespaceBundle.getKeyRange();
        return ((Long) keyRange.upperEndpoint()).longValue() - ((Long) keyRange.lowerEndpoint()).longValue() > 1;
    }

    public static void validateFullRange(SortedSet<String> sortedSet) {
        Preconditions.checkArgument(sortedSet.first().equals("0x00000000") && sortedSet.last().equals("0xffffffff"));
    }

    public static NamespaceBundleFactory createFactory(PulsarService pulsarService, HashFunction hashFunction) {
        return new NamespaceBundleFactory(pulsarService, hashFunction);
    }

    public static boolean isFullBundle(String str) {
        return str.equals(String.format("%s_%s", "0x00000000", "0xffffffff"));
    }

    public static String getDefaultBundleRange() {
        return String.format("%s_%s", "0x00000000", "0xffffffff");
    }

    public static String getNamespaceFromPoliciesPath(String str) {
        if (str.isEmpty()) {
            return str;
        }
        Iterator it = Splitter.on("/").limit(6).split(str).iterator();
        it.next();
        it.next();
        it.next();
        return Joiner.on("/").join(it);
    }
}
