/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.common.naming;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
    private final HashFunction hashFunc;
    private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
    private final PulsarService pulsar;
    private final MetadataCache<Policies> policiesCache;
    private final Duration maxRetryDuration = Duration.ofSeconds(10L);

    public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
        this.hashFunc = hashFunc;
        this.bundlesCache = Caffeine.newBuilder().recordStats().buildAsync(this::loadBundles);
        CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
        pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        this.pulsar = pulsar;
        this.policiesCache = pulsar.getConfigurationMetadataStore().getMetadataCache(Policies.class);
    }

    private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace, Executor executor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Loading cache with bundles for {}", (Object)namespace);
        }
        if (this.pulsar == null) {
            return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
        }
        CompletableFuture<NamespaceBundles> future = new CompletableFuture<NamespaceBundles>();
        this.doLoadBundles(namespace, future, NamespaceBundleFactory.createBackoff(), System.nanoTime() + this.maxRetryDuration.toNanos());
        return future;
    }

    private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future, Backoff backoff, long retryDeadline) {
        ((CompletableFuture)this.pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
            if (result.isPresent()) {
                try {
                    future.complete(this.readBundles(namespace, (LocalPolicies)((CacheGetResult)result.get()).getValue(), ((CacheGetResult)result.get()).getStat().getVersion()));
                }
                catch (IOException e) {
                    this.handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
                }
            } else {
                ((CompletableFuture)this.copyToLocalPolicies(namespace).thenAccept(b -> future.complete((NamespaceBundles)b))).exceptionally(ex -> {
                    this.handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, (Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
    }

    private void handleLoadBundlesRetry(NamespaceName namespace, CompletableFuture<NamespaceBundles> future, Backoff backoff, long retryDeadline, Throwable e) {
        if (e instanceof Error || System.nanoTime() > retryDeadline) {
            future.completeExceptionally(e);
        } else {
            LOG.warn("Error loading bundle for {}. Retrying exception", (Object)namespace, (Object)e);
            long retryDelay = backoff.next();
            this.pulsar.getExecutor().schedule(() -> this.doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
        }
    }

    private static Backoff createBackoff() {
        return new Backoff(100L, TimeUnit.MILLISECONDS, 5L, TimeUnit.SECONDS, 0L, TimeUnit.MILLISECONDS);
    }

    private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version) throws IOException {
        NamespaceBundles namespaceBundles = this.getBundles(namespace, Optional.of(Pair.of((Object)localPolicies, (Object)version)));
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Get bundles from getLocalZkCacheService: bundles: {}, version: {}", new Object[]{namespace, localPolicies.bundles.getBoundaries() != null ? localPolicies.bundles : "null", namespaceBundles.getVersion()});
        }
        return namespaceBundles;
    }

    private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespace) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace).thenCompose(optPolicies -> {
            if (!optPolicies.isPresent()) {
                return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
            }
            Policies policies = (Policies)optPolicies.get();
            LocalPolicies localPolicies = new LocalPolicies(policies.bundles, null, null);
            return this.pulsar.getPulsarResources().getLocalPolicies().createLocalPoliciesAsync(namespace, localPolicies).thenApply(stat -> this.getBundles(namespace, Optional.of(Pair.of((Object)localPolicies, (Object)0L))));
        });
    }

    private void handleMetadataStoreNotification(Notification n) {
        if (LocalPoliciesResources.isLocalPoliciesPath((String)n.getPath())) {
            try {
                Optional namespace = NamespaceName.getIfValid((String)NamespaceBundleFactory.getNamespaceFromPoliciesPath(n.getPath()));
                if (namespace.isPresent()) {
                    LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", (Object)namespace);
                    this.bundlesCache.synchronous().invalidate(namespace.get());
                }
            }
            catch (Exception e) {
                LOG.error("Failed to update the policy change for path {}", (Object)n.getPath(), (Object)e);
            }
        }
    }

    public void invalidateBundleCache(NamespaceName namespace) {
        this.bundlesCache.synchronous().invalidate((Object)namespace);
    }

    public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname) {
        return this.bundlesCache.get((Object)nsname);
    }

    public NamespaceBundle getBundleWithHighestTopics(NamespaceName nsname) {
        try {
            return this.getBundleWithHighestTopicsAsync(nsname).get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            LOG.info("failed to derive bundle for {}", (Object)nsname, (Object)e);
            throw new IllegalStateException(e instanceof ExecutionException ? e.getCause() : e);
        }
    }

    public CompletableFuture<NamespaceBundle> getBundleWithHighestTopicsAsync(NamespaceName nsname) {
        return this.pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(nsname).thenCompose(topics -> this.bundlesCache.get((Object)nsname).handle((bundles, e) -> {
            HashMap<String, Integer> countMap = new HashMap<String, Integer>();
            NamespaceBundle resultBundle = null;
            int maxCount = 0;
            for (String topic : topics) {
                NamespaceBundle bundle = bundles.findBundle(TopicName.get((String)topic));
                String bundleRange = bundle.getBundleRange();
                int count = countMap.getOrDefault(bundleRange, 0) + 1;
                countMap.put(bundleRange, count);
                if (count <= maxCount) continue;
                maxCount = count;
                resultBundle = bundle;
            }
            return resultBundle;
        }));
    }

    public NamespaceBundle getBundle(TopicName topic) {
        NamespaceBundles bundles = (NamespaceBundles)this.bundlesCache.synchronous().get((Object)topic.getNamespaceObject());
        return bundles != null ? bundles.findBundle(topic) : null;
    }

    public NamespaceBundle getBundleWithHighestThroughput(NamespaceName nsName) {
        LoadManager loadManager = this.pulsar.getLoadManager().get();
        if (loadManager instanceof ModularLoadManagerWrapper) {
            NamespaceBundles bundles = this.getBundles(nsName);
            double maxMsgThroughput = -1.0;
            NamespaceBundle bundleWithHighestThroughput = null;
            for (NamespaceBundle bundle : bundles.getBundles()) {
                BundleData bundleData = ((ModularLoadManagerWrapper)loadManager).getLoadManager().getBundleDataOrDefault(bundle.toString());
                if (bundleData.getTopics() <= 0 || !(bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput)) continue;
                maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput();
                bundleWithHighestThroughput = bundle;
            }
            return bundleWithHighestThroughput;
        }
        return this.getBundleWithHighestTopics(nsName);
    }

    public NamespaceBundles getBundles(NamespaceName nsname) {
        return (NamespaceBundles)this.bundlesCache.synchronous().get((Object)nsname);
    }

    public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName nsname) {
        return Optional.ofNullable(this.bundlesCache.synchronous().getIfPresent((Object)nsname));
    }

    public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
        return new NamespaceBundle(nsname, hashRange, this);
    }

    public NamespaceBundle getBundle(String namespace, String bundleRange) {
        Preconditions.checkArgument((boolean)bundleRange.contains("_"), (Object)"Invalid bundle range");
        String[] boundaries = bundleRange.split("_");
        Long lowerEndpoint = Long.decode(boundaries[0]);
        Long upperEndpoint = Long.decode(boundaries[1]);
        Range hashRange = Range.range((Comparable)lowerEndpoint, (BoundType)BoundType.CLOSED, (Comparable)upperEndpoint, (BoundType)(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN));
        return this.getBundle(NamespaceName.get((String)namespace), (Range<Long>)hashRange);
    }

    public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return ((NamespaceBundles)this.bundlesCache.synchronous().get((Object)fqnn)).getFullBundle();
    }

    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
        return this.bundlesCache.get((Object)fqnn).thenApply(NamespaceBundles::getFullBundle);
    }

    public long getLongHashCode(String name) {
        return this.hashFunc.hashString((CharSequence)name, Charsets.UTF_8).padToLong();
    }

    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
        return new NamespaceBundles(nsname, this, Optional.empty(), NamespaceBundles.getPartitions(bundleData));
    }

    private NamespaceBundles getBundles(NamespaceName nsname, Optional<Pair<LocalPolicies, Long>> localPolicies) {
        return new NamespaceBundles(nsname, this, localPolicies);
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(NamespaceBundle targetBundle, int argNumBundles, List<Long> splitBoundaries) {
        Preconditions.checkArgument((boolean)this.canSplitBundle(targetBundle), (String)"%s bundle can't be split further since range not larger than 1", (Object)targetBundle);
        if (splitBoundaries != null && splitBoundaries.size() > 0) {
            Collections.sort(splitBoundaries);
            Preconditions.checkArgument((splitBoundaries.get(0) > targetBundle.getLowerEndpoint() && splitBoundaries.get(splitBoundaries.size() - 1) < targetBundle.getUpperEndpoint() ? 1 : 0) != 0, (String)"The given fixed keys must between the key range of the %s bundle", (Object)targetBundle);
            argNumBundles = splitBoundaries.size() + 1;
        }
        Preconditions.checkNotNull((Object)targetBundle, (Object)"can't split null bundle");
        Preconditions.checkNotNull((Object)targetBundle.getNamespaceObject(), (Object)"namespace must be present");
        NamespaceName nsname = targetBundle.getNamespaceObject();
        int numBundles = argNumBundles;
        return this.bundlesCache.get((Object)nsname).thenApply(sourceBundle -> {
            int lastIndex = sourceBundle.partitions.length - 1;
            long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
            int pos = 0;
            int splitPartition = -1;
            Range<Long> range = targetBundle.getKeyRange();
            for (int i = 0; i < lastIndex; ++i) {
                if (sourceBundle.partitions[i] == (Long)range.lowerEndpoint() && (Long)range.upperEndpoint() == sourceBundle.partitions[i + 1]) {
                    splitPartition = i;
                    long minVal = sourceBundle.partitions[i];
                    partitions[pos++] = minVal;
                    if (splitBoundaries == null || splitBoundaries.size() == 0) {
                        long maxVal = sourceBundle.partitions[i + 1];
                        long segSize = (maxVal - minVal) / (long)numBundles;
                        long curPartition = minVal + segSize;
                        for (int j = 0; j < numBundles - 1; ++j) {
                            partitions[pos++] = curPartition;
                            curPartition += segSize;
                        }
                        continue;
                    }
                    Iterator iterator = splitBoundaries.iterator();
                    while (iterator.hasNext()) {
                        long splitBoundary = (Long)iterator.next();
                        partitions[pos++] = splitBoundary;
                    }
                    continue;
                }
                partitions[pos++] = sourceBundle.partitions[i];
            }
            partitions[pos] = sourceBundle.partitions[lastIndex];
            if (splitPartition != -1) {
                NamespaceBundles splitNsBundles = new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
                List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition, splitPartition + numBundles);
                return new ImmutablePair((Object)splitNsBundles, splitBundles);
            }
            return null;
        });
    }

    public boolean canSplitBundle(NamespaceBundle bundle) {
        Range<Long> range = bundle.getKeyRange();
        return (Long)range.upperEndpoint() - (Long)range.lowerEndpoint() > 1L;
    }

    public static void validateFullRange(SortedSet<String> partitions) {
        Preconditions.checkArgument((partitions.first().equals("0x00000000") && partitions.last().equals("0xffffffff") ? 1 : 0) != 0);
    }

    public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
        return new NamespaceBundleFactory(pulsar, hashFunc);
    }

    public static boolean isFullBundle(String bundleRange) {
        return bundleRange.equals(String.format("%s_%s", "0x00000000", "0xffffffff"));
    }

    public static String getDefaultBundleRange() {
        return String.format("%s_%s", "0x00000000", "0xffffffff");
    }

    public static String getNamespaceFromPoliciesPath(String path) {
        if (path.isEmpty()) {
            return path;
        }
        Iterable splitter = Splitter.on((String)"/").limit(6).split((CharSequence)path);
        Iterator i = splitter.iterator();
        i.next();
        i.next();
        i.next();
        return Joiner.on((String)"/").join(i);
    }
}

