package org.apache.pulsar.broker.loadbalance.extensions;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerLoadManagerClassFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.class */
public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
    private static final Logger log = LoggerFactory.getLogger(ExtensibleLoadManagerImpl.class);
    public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "loadbalancer-broker-load-data").toString();
    public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "loadbalancer-top-bundles-load-data").toString();
    private static final long MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS = 200;
    private static final long MONITOR_INTERVAL_IN_MILLIS = 120000;
    public static final long COMPACTION_THRESHOLD = 5242880;
    private static final String ELECTION_ROOT = "/loadbalance/extension/leader";
    private PulsarService pulsar;
    private ServiceConfiguration conf;
    private BrokerRegistry brokerRegistry;
    private ServiceUnitStateChannel serviceUnitStateChannel;
    private AntiAffinityGroupPolicyFilter antiAffinityGroupPolicyFilter;
    private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;
    private IsolationPoliciesHelper isolationPoliciesHelper;
    private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
    private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
    private LoadManagerScheduler unloadScheduler;
    private LeaderElectionService leaderElectionService;
    private LoadManagerContext context;
    private final BrokerSelectionStrategy brokerSelectionStrategy;
    private BrokerLoadDataReporter brokerLoadDataReporter;
    private TopBundleLoadDataReporter topBundleLoadDataReporter;
    private ScheduledFuture brokerLoadDataReportTask;
    private ScheduledFuture topBundlesLoadDataReportTask;
    private ScheduledFuture monitorTask;
    private SplitScheduler splitScheduler;
    private UnloadManager unloadManager;
    private SplitManager splitManager;
    private volatile Role role;
    private volatile boolean started = false;
    private boolean configuredSystemTopics = false;
    private final AssignCounter assignCounter = new AssignCounter();
    private final UnloadCounter unloadCounter = new UnloadCounter();
    private final SplitCounter splitCounter = new SplitCounter();
    private final AtomicLong ignoredSendMsgCounter = new AtomicLong();
    private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
    private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();
    private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>> lookupRequests = new ConcurrentHashMap<>();
    private final CountDownLatch initWaiter = new CountDownLatch(1);
    private final List<BrokerFilter> brokerFilterPipeline = new ArrayList();

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl$Role.class */
    public enum Role {
        Leader,
        Follower
    }

    public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
        if (!this.started) {
            log.warn("Failed to get owned service units, load manager is not started.");
            return CompletableFuture.completedFuture(Collections.emptySet());
        }
        String brokerId = this.brokerRegistry.getBrokerId();
        Set set = (Set) this.serviceUnitStateChannel.getOwnershipEntrySet().stream().filter(entry -> {
            ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) entry.getValue();
            return serviceUnitStateData.state() == ServiceUnitState.Owned && StringUtils.isNotBlank(serviceUnitStateData.dstBroker()) && serviceUnitStateData.dstBroker().equals(brokerId);
        }).map(entry2 -> {
            return LoadManagerShared.getNamespaceBundle(this.pulsar, (String) entry2.getKey());
        }).collect(Collectors.toSet());
        NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, this.pulsar.getConfiguration());
        NamespaceName heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(brokerId, this.pulsar.getConfiguration());
        NamespaceName sLAMonitorNamespace = NamespaceService.getSLAMonitorNamespace(brokerId, this.pulsar.getConfiguration());
        return this.pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundleAsync(heartbeatNamespace).thenAccept(namespaceBundle -> {
            set.add(namespaceBundle);
        }).exceptionally(th -> {
            log.warn("Failed to get heartbeat namespace bundle.", th);
            return null;
        }).thenCompose(r5 -> {
            return this.pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundleAsync(heartbeatNamespaceV2);
        }).thenAccept((Consumer<? super U>) namespaceBundle2 -> {
            set.add(namespaceBundle2);
        }).exceptionally(th2 -> {
            log.warn("Failed to get heartbeat namespace V2 bundle.", th2);
            return null;
        }).thenCompose(r52 -> {
            return this.pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundleAsync(sLAMonitorNamespace);
        }).thenAccept((Consumer<? super U>) namespaceBundle3 -> {
            set.add(namespaceBundle3);
        }).exceptionally(th3 -> {
            log.warn("Failed to get SLA Monitor namespace bundle.", th3);
            return null;
        }).thenApply(r3 -> {
            return set;
        });
    }

    public ExtensibleLoadManagerImpl() {
        this.brokerFilterPipeline.add(new BrokerLoadManagerClassFilter());
        this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
        this.brokerFilterPipeline.add(new BrokerVersionFilter());
        this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
    }

    public static boolean isLoadManagerExtensionEnabled(PulsarService pulsarService) {
        return pulsarService.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
    }

    public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
        if (loadManager instanceof ExtensibleLoadManagerWrapper) {
            return ((ExtensibleLoadManagerWrapper) loadManager).get();
        }
        throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
    }

    public static ExtensibleLoadManagerImpl get(PulsarService pulsarService) {
        return get(pulsarService.getLoadManager().get());
    }

    public static boolean debug(ServiceConfiguration serviceConfiguration, Logger logger) {
        return serviceConfiguration.isLoadBalancerDebugModeEnabled() || logger.isDebugEnabled();
    }

    public static void createSystemTopic(PulsarService pulsarService, String str) throws PulsarServerException {
        try {
            pulsarService.getAdminClient().topics().createNonPartitionedTopic(str);
            log.info("Created topic {}.", str);
        } catch (PulsarAdminException.ConflictException e) {
            if (debug(pulsarService.getConfiguration(), log)) {
                log.info("Topic {} already exists.", str);
            }
        } catch (PulsarAdminException e2) {
            throw new PulsarServerException(e2);
        }
    }

    private static void createSystemTopics(PulsarService pulsarService) throws PulsarServerException {
        createSystemTopic(pulsarService, BROKER_LOAD_DATA_STORE_TOPIC);
        createSystemTopic(pulsarService, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
    }

    private static boolean configureSystemTopics(PulsarService pulsarService) {
        try {
            if (!isLoadManagerExtensionEnabled(pulsarService) || !pulsarService.getConfiguration().isSystemTopicEnabled() || !pulsarService.getConfiguration().isTopicLevelPoliciesEnabled()) {
                log.warn("System topic or topic level policies is disabled. {} compaction threshold follows the broker or namespace policies.", ServiceUnitStateChannelImpl.TOPIC);
                return true;
            }
            Long compactionThreshold = pulsarService.getAdminClient().topicPolicies().getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC);
            if (compactionThreshold == null || COMPACTION_THRESHOLD != compactionThreshold.longValue()) {
                pulsarService.getAdminClient().topicPolicies().setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, COMPACTION_THRESHOLD);
                log.info("Set compaction threshold: {} bytes for system topic {}.", Long.valueOf(COMPACTION_THRESHOLD), ServiceUnitStateChannelImpl.TOPIC);
            }
            return true;
        } catch (Exception e) {
            log.error("Failed to set compaction threshold for system topic:{}", ServiceUnitStateChannelImpl.TOPIC, e);
            return false;
        }
    }

    public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLookupData(PulsarService pulsarService, String str) {
        if (!isLoadManagerExtensionEnabled(pulsarService)) {
            return CompletableFuture.completedFuture(Optional.empty());
        }
        try {
            return pulsarService.getNamespaceService().getBundleAsync(TopicName.get(str)).thenCompose(namespaceBundle -> {
                ExtensibleLoadManagerImpl extensibleLoadManagerImpl = get(pulsarService);
                Optional<String> assigned = extensibleLoadManagerImpl.getServiceUnitStateChannel().getAssigned(namespaceBundle.toString());
                return assigned.isPresent() ? extensibleLoadManagerImpl.getBrokerRegistry().lookupAsync(assigned.get()) : CompletableFuture.completedFuture(Optional.empty());
            });
        } catch (Throwable th) {
            log.error("Failed to lookup destination broker for topic:{}", str, th);
            return CompletableFuture.completedFuture(Optional.empty());
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager
    public void start() throws PulsarServerException {
        if (this.started) {
            return;
        }
        try {
            this.brokerRegistry = new BrokerRegistryImpl(this.pulsar);
            this.leaderElectionService = new LeaderElectionService(this.pulsar.getCoordinationService(), this.pulsar.getBrokerId(), this.pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, leaderElectionState -> {
                this.pulsar.getLoadManagerExecutor().execute(() -> {
                    if (leaderElectionState == LeaderElectionState.Leading) {
                        playLeader();
                    } else {
                        playFollower();
                    }
                });
            });
            this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(this.pulsar);
            this.brokerRegistry.start();
            this.splitManager = new SplitManager(this.splitCounter);
            this.unloadManager = new UnloadManager(this.unloadCounter);
            this.serviceUnitStateChannel.listen(this.unloadManager);
            this.serviceUnitStateChannel.listen(this.splitManager);
            this.leaderElectionService.start();
            this.serviceUnitStateChannel.start();
            this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(this.pulsar, this.serviceUnitStateChannel);
            this.antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
            this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(this.antiAffinityGroupPolicyHelper);
            this.brokerFilterPipeline.add(this.antiAffinityGroupPolicyFilter);
            this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(this.pulsar));
            this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(this.isolationPoliciesHelper));
            try {
                this.brokerLoadDataStore = LoadDataStoreFactory.create(this.pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
                this.topBundlesLoadDataStore = LoadDataStoreFactory.create(this.pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
                this.context = LoadManagerContextImpl.builder().configuration(this.conf).brokerRegistry(this.brokerRegistry).brokerLoadDataStore(this.brokerLoadDataStore).topBundleLoadDataStore(this.topBundlesLoadDataStore).build();
                this.brokerLoadDataReporter = new BrokerLoadDataReporter(this.pulsar, this.brokerRegistry.getBrokerId(), this.brokerLoadDataStore);
                this.topBundleLoadDataReporter = new TopBundleLoadDataReporter(this.pulsar, this.brokerRegistry.getBrokerId(), this.topBundlesLoadDataStore);
                this.serviceUnitStateChannel.listen(this.brokerLoadDataReporter);
                this.serviceUnitStateChannel.listen(this.topBundleLoadDataReporter);
                int loadBalancerReportUpdateMinIntervalMillis = this.conf.getLoadBalancerReportUpdateMinIntervalMillis();
                this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor().scheduleAtFixedRate(() -> {
                    try {
                        this.brokerLoadDataReporter.reportAsync(false);
                    } catch (Throwable th) {
                        log.error("Failed to run the broker load manager executor job.", th);
                    }
                }, loadBalancerReportUpdateMinIntervalMillis, loadBalancerReportUpdateMinIntervalMillis, TimeUnit.MILLISECONDS);
                this.topBundlesLoadDataReportTask = this.pulsar.getLoadManagerExecutor().scheduleAtFixedRate(() -> {
                    try {
                        this.topBundleLoadDataReporter.reportAsync(false);
                    } catch (Throwable th) {
                        log.error("Failed to run the top bundles load manager executor job.", th);
                    }
                }, loadBalancerReportUpdateMinIntervalMillis, loadBalancerReportUpdateMinIntervalMillis, TimeUnit.MILLISECONDS);
                this.monitorTask = this.pulsar.getLoadManagerExecutor().scheduleAtFixedRate(() -> {
                    monitor();
                }, MONITOR_INTERVAL_IN_MILLIS, MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);
                this.unloadScheduler = new UnloadScheduler(this.pulsar, this.pulsar.getLoadManagerExecutor(), this.unloadManager, this.context, this.serviceUnitStateChannel, this.unloadCounter, this.unloadMetrics);
                this.splitScheduler = new SplitScheduler(this.pulsar, this.serviceUnitStateChannel, this.splitManager, this.splitCounter, this.splitMetrics, this.context);
                this.splitScheduler.start();
                this.initWaiter.countDown();
                this.started = true;
                log.info("Started load manager.");
            } catch (LoadDataStoreException e) {
                throw new PulsarServerException(e);
            }
        } catch (Exception e2) {
            log.error("Failed to start the extensible load balance and close broker registry {}.", this.brokerRegistry, e2);
            if (this.brokerRegistry != null) {
                this.brokerRegistry.close();
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager
    public void initialize(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.conf = pulsarService.getConfiguration();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager
    public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> optional, ServiceUnitId serviceUnitId) {
        String serviceUnitId2 = serviceUnitId.toString();
        return dedupeLookupRequest(serviceUnitId2, str -> {
            CompletableFuture<Optional<String>> completedFuture;
            if (optional.isPresent() && isInternalTopic(((ServiceUnitId) optional.get()).toString())) {
                completedFuture = this.serviceUnitStateChannel.getChannelOwnerAsync();
            } else {
                String heartbeatOrSLAMonitorBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnitId);
                completedFuture = heartbeatOrSLAMonitorBrokerId != null ? CompletableFuture.completedFuture(Optional.of(heartbeatOrSLAMonitorBrokerId)) : getOrSelectOwnerAsync(serviceUnitId, serviceUnitId2).thenApply((v0) -> {
                    return Optional.ofNullable(v0);
                });
            }
            return getBrokerLookupData(completedFuture, serviceUnitId2);
        });
    }

    private String getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId serviceUnitId) {
        String checkHeartbeatNamespace = NamespaceService.checkHeartbeatNamespace(serviceUnitId);
        if (checkHeartbeatNamespace == null) {
            checkHeartbeatNamespace = NamespaceService.checkHeartbeatNamespaceV2(serviceUnitId);
        }
        if (checkHeartbeatNamespace == null) {
            checkHeartbeatNamespace = NamespaceService.getSLAMonitorBrokerName(serviceUnitId);
        }
        return checkHeartbeatNamespace != null ? checkHeartbeatNamespace.substring(checkHeartbeatNamespace.lastIndexOf(47) + 1) : checkHeartbeatNamespace;
    }

    private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnitId, String str) {
        return this.serviceUnitStateChannel.getOwnerAsync(str).thenCompose(optional -> {
            if (optional.isEmpty()) {
                return selectAsync(serviceUnitId).thenCompose(optional -> {
                    if (!optional.isPresent()) {
                        throw new IllegalStateException("Failed to select the new owner broker for bundle: " + str);
                    }
                    this.assignCounter.incrementSuccess();
                    log.info("Selected new owner broker: {} for bundle: {}.", optional.get(), str);
                    return this.serviceUnitStateChannel.publishAssignEventAsync(str, (String) optional.get());
                });
            }
            this.assignCounter.incrementSkip();
            return CompletableFuture.completedFuture((String) optional.get());
        });
    }

    private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(CompletableFuture<Optional<String>> completableFuture, String str) {
        return completableFuture.thenCompose(optional -> {
            if (!optional.isEmpty()) {
                return CompletableFuture.completedFuture((String) optional.get());
            }
            String format = String.format("Failed to get or assign the owner for bundle:%s", str);
            log.error(format);
            throw new IllegalStateException(format);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) str2 -> {
            return getBrokerRegistry().lookupAsync(str2).thenCompose(optional2 -> {
                if (!optional2.isEmpty()) {
                    return CompletableFuture.completedFuture(optional2);
                }
                String format = String.format("Failed to lookup broker:%s for bundle:%s, the broker has not been registered.", str2, str);
                log.error(format);
                throw new IllegalStateException(format);
            });
        });
    }

    public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
        log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, this.brokerRegistry.getBrokerId());
        String namespaceBundle2 = namespaceBundle.toString();
        return assign(Optional.empty(), namespaceBundle).thenApply(optional -> {
            if (!optional.isEmpty()) {
                return ((BrokerLookupData) optional.get()).toNamespaceEphemeralData();
            }
            String format = String.format("Failed to get the broker lookup data for bundle:%s", namespaceBundle2);
            log.error(format);
            throw new IllegalStateException(format);
        });
    }

    private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(String str, Function<String, CompletableFuture<Optional<BrokerLookupData>>> function) {
        MutableObject mutableObject = new MutableObject();
        try {
            CompletableFuture<Optional<BrokerLookupData>> computeIfAbsent = this.lookupRequests.computeIfAbsent(str, str2 -> {
                CompletableFuture completableFuture = (CompletableFuture) function.apply(str2);
                mutableObject.setValue(completableFuture);
                return completableFuture;
            });
            if (mutableObject.getValue() != null) {
                ((CompletableFuture) mutableObject.getValue()).whenComplete((optional, th) -> {
                    if (th != null) {
                        this.assignCounter.incrementFailure();
                    }
                    this.lookupRequests.remove(str);
                });
            }
            return computeIfAbsent;
        } catch (Throwable th2) {
            if (mutableObject.getValue() != null) {
                ((CompletableFuture) mutableObject.getValue()).whenComplete((optional2, th3) -> {
                    if (th3 != null) {
                        this.assignCounter.incrementFailure();
                    }
                    this.lookupRequests.remove(str);
                });
            }
            throw th2;
        }
    }

    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId serviceUnitId) {
        return selectAsync(serviceUnitId, Collections.emptySet());
    }

    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId serviceUnitId, Set<String> set) {
        return getBrokerRegistry().getAvailableBrokerLookupDataAsync().thenComposeAsync(map -> {
            LoadManagerContext context = getContext();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(map);
            if (!set.isEmpty()) {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    concurrentHashMap.remove((String) it.next());
                }
            }
            List<BrokerFilter> brokerFilterPipeline = getBrokerFilterPipeline();
            ArrayList arrayList = new ArrayList(brokerFilterPipeline.size());
            Iterator<BrokerFilter> it2 = brokerFilterPipeline.iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().filterAsync(concurrentHashMap, serviceUnitId, context));
            }
            CompletableFuture completableFuture = new CompletableFuture();
            FutureUtil.waitForAll(arrayList).whenComplete((r11, th) -> {
                if (th != null) {
                    log.error("Failed to filter out brokers when select bundle: {}", serviceUnitId, th);
                }
                if (concurrentHashMap.isEmpty()) {
                    completableFuture.complete(Optional.empty());
                } else {
                    completableFuture.complete(getBrokerSelectionStrategy().select(concurrentHashMap.keySet(), serviceUnitId, context));
                }
            });
            return completableFuture;
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager
    public CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> optional, ServiceUnitId serviceUnitId) {
        return getOwnershipAsync(optional, serviceUnitId).thenApply(optional2 -> {
            return Boolean.valueOf(this.brokerRegistry.getBrokerId().equals(optional2.orElse(null)));
        });
    }

    public CompletableFuture<Optional<String>> getOwnershipAsync(Optional<ServiceUnitId> optional, ServiceUnitId serviceUnitId) {
        String serviceUnitId2 = serviceUnitId.toString();
        if (optional.isPresent() && isInternalTopic(optional.get().toString())) {
            return this.serviceUnitStateChannel.getChannelOwnerAsync();
        }
        String heartbeatOrSLAMonitorBrokerId = getHeartbeatOrSLAMonitorBrokerId(serviceUnitId);
        return heartbeatOrSLAMonitorBrokerId != null ? CompletableFuture.completedFuture(Optional.of(heartbeatOrSLAMonitorBrokerId)) : this.serviceUnitStateChannel.getOwnerAsync(serviceUnitId2);
    }

    public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataAsync(ServiceUnitId serviceUnitId) {
        return getOwnershipAsync(Optional.empty(), serviceUnitId).thenCompose(optional -> {
            return optional.isEmpty() ? CompletableFuture.completedFuture(Optional.empty()) : getBrokerRegistry().lookupAsync((String) optional.get());
        });
    }

    public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId serviceUnitId, Optional<String> optional) {
        if (!NamespaceService.isSLAOrHeartbeatNamespace(serviceUnitId.getNamespaceObject().toString())) {
            return getOwnershipAsync(Optional.empty(), serviceUnitId).thenCompose(optional2 -> {
                if (optional2.isEmpty()) {
                    String format = String.format("Namespace bundle: %s is not owned by any broker.", serviceUnitId);
                    log.warn(format);
                    throw new IllegalStateException(format);
                }
                String str = (String) optional2.get();
                if (!optional.isPresent() || !str.endsWith((String) optional.get())) {
                    return unloadAsync(new UnloadDecision(new Unload(str, serviceUnitId.toString(), optional, true), UnloadDecision.Label.Success, UnloadDecision.Reason.Admin), this.conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
                }
                String format2 = String.format("Namespace bundle: %s own by %s, cannot be transfer to same broker.", serviceUnitId, str);
                log.warn(format2);
                throw new IllegalArgumentException(format2);
            });
        }
        log.info("Skip unloading namespace bundle: {}.", serviceUnitId);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> unloadAsync(UnloadDecision unloadDecision, long j, TimeUnit timeUnit) {
        Unload unload = unloadDecision.getUnload();
        return this.unloadManager.waitAsync(this.serviceUnitStateChannel.publishUnloadEventAsync(unload), unload.serviceUnit(), unloadDecision, j, timeUnit).thenRun(() -> {
            this.unloadCounter.updateUnloadBrokerCount(1);
        });
    }

    public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId serviceUnitId, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, List<Long> list) {
        if (NamespaceService.isSLAOrHeartbeatNamespace(serviceUnitId.getNamespaceObject().toString())) {
            log.info("Skip split namespace bundle: {}.", serviceUnitId);
            return CompletableFuture.completedFuture(null);
        }
        NamespaceBundle bundle = this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(LoadManagerShared.getNamespaceNameFromBundleName(serviceUnitId.toString()), LoadManagerShared.getBundleRangeFromBundleName(serviceUnitId.toString()));
        return this.pulsar.getNamespaceService().getSplitBoundary(bundle, namespaceBundleSplitAlgorithm, list).thenCompose(pair -> {
            if (pair != null) {
                return getOwnershipAsync(Optional.empty(), serviceUnitId).thenCompose(optional -> {
                    if (optional.isEmpty()) {
                        String format = String.format("Namespace bundle: %s is not owned by any broker.", serviceUnitId);
                        log.warn(format);
                        throw new IllegalStateException(format);
                    }
                    String str = (String) optional.get();
                    SplitDecision splitDecision = new SplitDecision();
                    List list2 = (List) pair.getRight();
                    HashMap hashMap = new HashMap();
                    list2.forEach(namespaceBundle -> {
                        hashMap.put(namespaceBundle.getBundleRange(), Optional.empty());
                    });
                    splitDecision.setSplit(new Split(serviceUnitId.toString(), str, hashMap));
                    splitDecision.setLabel(SplitDecision.Label.Success);
                    splitDecision.setReason(SplitDecision.Reason.Admin);
                    return splitAsync(splitDecision, this.conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
                });
            }
            String format = String.format("Bundle %s not found under namespace", bundle);
            log.error(format);
            return FutureUtil.failedFuture(new IllegalStateException(format));
        });
    }

    private CompletableFuture<Void> splitAsync(SplitDecision splitDecision, long j, TimeUnit timeUnit) {
        return this.splitManager.waitAsync(this.serviceUnitStateChannel.publishSplitEventAsync(splitDecision.getSplit()), splitDecision.getSplit().serviceUnit(), splitDecision, j, timeUnit);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws PulsarServerException {
        try {
            if (this.started) {
                try {
                    if (this.brokerLoadDataReportTask != null) {
                        this.brokerLoadDataReportTask.cancel(true);
                    }
                    if (this.topBundlesLoadDataReportTask != null) {
                        this.topBundlesLoadDataReportTask.cancel(true);
                    }
                    if (this.monitorTask != null) {
                        this.monitorTask.cancel(true);
                    }
                    this.brokerLoadDataStore.close();
                    this.topBundlesLoadDataStore.close();
                    this.unloadScheduler.close();
                    this.splitScheduler.close();
                    try {
                        this.brokerRegistry.close();
                        try {
                            this.serviceUnitStateChannel.close();
                            this.unloadManager.close();
                            try {
                                try {
                                    this.leaderElectionService.close();
                                    this.started = false;
                                } catch (Exception e) {
                                    throw new PulsarServerException(e);
                                }
                            } finally {
                            }
                        } catch (Throwable th) {
                            this.unloadManager.close();
                            try {
                                try {
                                    this.leaderElectionService.close();
                                    this.started = false;
                                    throw th;
                                } finally {
                                }
                            } catch (Exception e2) {
                                throw new PulsarServerException(e2);
                            }
                        }
                    } catch (Throwable th2) {
                        try {
                            this.serviceUnitStateChannel.close();
                            this.unloadManager.close();
                            try {
                                try {
                                    this.leaderElectionService.close();
                                    this.started = false;
                                    throw th2;
                                } catch (Exception e3) {
                                    throw new PulsarServerException(e3);
                                }
                            } finally {
                                this.started = false;
                            }
                        } catch (Throwable th3) {
                            this.unloadManager.close();
                            try {
                                try {
                                    this.leaderElectionService.close();
                                    this.started = false;
                                    throw th3;
                                } finally {
                                    this.started = false;
                                }
                            } catch (Exception e4) {
                                throw new PulsarServerException(e4);
                            }
                        }
                    }
                } catch (IOException e5) {
                    throw new PulsarServerException(e5);
                }
            }
        } catch (Throwable th4) {
            try {
                this.brokerRegistry.close();
                try {
                    this.serviceUnitStateChannel.close();
                    this.unloadManager.close();
                    try {
                        try {
                            this.leaderElectionService.close();
                            this.started = false;
                            throw th4;
                        } finally {
                            this.started = false;
                        }
                    } catch (Exception e6) {
                        throw new PulsarServerException(e6);
                    }
                } catch (Throwable th5) {
                    this.unloadManager.close();
                    try {
                        try {
                            this.leaderElectionService.close();
                            this.started = false;
                            throw th5;
                        } finally {
                            this.started = false;
                        }
                    } catch (Exception e7) {
                        throw new PulsarServerException(e7);
                    }
                }
            } catch (Throwable th6) {
                try {
                    this.serviceUnitStateChannel.close();
                    this.unloadManager.close();
                    try {
                        try {
                            this.leaderElectionService.close();
                            this.started = false;
                            throw th6;
                        } finally {
                            this.started = false;
                        }
                    } catch (Exception e8) {
                        throw new PulsarServerException(e8);
                    }
                } catch (Throwable th7) {
                    this.unloadManager.close();
                    try {
                        try {
                            this.leaderElectionService.close();
                            this.started = false;
                            throw th7;
                        } catch (Exception e9) {
                            throw new PulsarServerException(e9);
                        }
                    } finally {
                        this.started = false;
                    }
                }
            }
        }
    }

    public static boolean isInternalTopic(String str) {
        return str.startsWith(ServiceUnitStateChannelImpl.TOPIC) || str.startsWith(BROKER_LOAD_DATA_STORE_TOPIC) || str.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
    }

    @VisibleForTesting
    synchronized void playLeader() {
        log.info("This broker:{} is setting the role from {} to {}", new Object[]{this.pulsar.getBrokerId(), this.role, Role.Leader});
        int i = 0;
        boolean z = false;
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            try {
                this.initWaiter.await();
                if (!this.serviceUnitStateChannel.isChannelOwner()) {
                    z = true;
                    break;
                }
                createSystemTopics(this.pulsar);
                this.brokerLoadDataStore.init();
                this.topBundlesLoadDataStore.init();
                this.unloadScheduler.start();
                this.serviceUnitStateChannel.scheduleOwnershipMonitor();
                break;
            } catch (Throwable th) {
                i++;
                log.warn("The broker:{} failed to set the role. Retrying {} th ...", new Object[]{this.pulsar.getBrokerId(), Integer.valueOf(i), th});
                try {
                    Thread.sleep(Math.min(i * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
                } catch (InterruptedException e) {
                    log.warn("Interrupted while sleeping.");
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (z) {
            log.warn("The broker:{} became follower while initializing leader role.", this.pulsar.getBrokerId());
            playFollower();
        } else {
            this.role = Role.Leader;
            log.info("This broker:{} plays the leader now.", this.pulsar.getBrokerId());
            this.brokerLoadDataReporter.reportAsync(true);
            this.topBundleLoadDataReporter.reportAsync(true);
        }
    }

    @VisibleForTesting
    synchronized void playFollower() {
        log.info("This broker:{} is setting the role from {} to {}", new Object[]{this.pulsar.getBrokerId(), this.role, Role.Follower});
        int i = 0;
        boolean z = false;
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            try {
                this.initWaiter.await();
                if (!this.serviceUnitStateChannel.isChannelOwner()) {
                    this.unloadScheduler.close();
                    this.serviceUnitStateChannel.cancelOwnershipMonitor();
                    this.brokerLoadDataStore.init();
                    this.topBundlesLoadDataStore.close();
                    this.topBundlesLoadDataStore.startProducer();
                    break;
                }
                z = true;
                break;
            } catch (Throwable th) {
                i++;
                log.warn("The broker:{} failed to set the role. Retrying {} th ...", new Object[]{this.pulsar.getBrokerId(), Integer.valueOf(i), th});
                try {
                    Thread.sleep(Math.min(i * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
                } catch (InterruptedException e) {
                    log.warn("Interrupted while sleeping.");
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (z) {
            log.warn("This broker:{} became leader while initializing follower role.", this.pulsar.getBrokerId());
            playLeader();
        } else {
            this.role = Role.Follower;
            log.info("This broker:{} plays a follower now.", this.pulsar.getBrokerId());
            this.brokerLoadDataReporter.reportAsync(true);
            this.topBundleLoadDataReporter.reportAsync(true);
        }
    }

    public List<Metrics> getMetrics() {
        ArrayList arrayList = new ArrayList();
        if (this.brokerLoadDataReporter != null) {
            arrayList.addAll(this.brokerLoadDataReporter.generateLoadData().toMetrics(this.pulsar.getAdvertisedAddress()));
        }
        if (this.unloadMetrics.get() != null) {
            arrayList.addAll(this.unloadMetrics.get());
        }
        if (this.splitMetrics.get() != null) {
            arrayList.addAll(this.splitMetrics.get());
        }
        arrayList.addAll(this.assignCounter.toMetrics(this.pulsar.getAdvertisedAddress()));
        arrayList.addAll(this.serviceUnitStateChannel.getMetrics());
        return arrayList;
    }

    @VisibleForTesting
    protected void monitor() {
        try {
            this.initWaiter.await();
            boolean isChannelOwner = this.serviceUnitStateChannel.isChannelOwner();
            if (isChannelOwner) {
                if (!this.configuredSystemTopics) {
                    this.configuredSystemTopics = configureSystemTopics(this.pulsar);
                }
                if (this.role != Role.Leader) {
                    log.warn("Current role:{} does not match with the channel ownership:{}. Playing the leader role.", this.role, Boolean.valueOf(isChannelOwner));
                    playLeader();
                }
            } else if (this.role != Role.Follower) {
                log.warn("Current role:{} does not match with the channel ownership:{}. Playing the follower role.", this.role, Boolean.valueOf(isChannelOwner));
                playFollower();
            }
        } catch (Throwable th) {
            log.error("Failed to get the channel ownership.", th);
        }
    }

    public void disableBroker() throws Exception {
        this.serviceUnitStateChannel.cleanOwnerships();
        this.leaderElectionService.close();
        this.brokerRegistry.unregister();
    }

    public BrokerRegistry getBrokerRegistry() {
        return this.brokerRegistry;
    }

    public ServiceUnitStateChannel getServiceUnitStateChannel() {
        return this.serviceUnitStateChannel;
    }

    public AntiAffinityGroupPolicyHelper getAntiAffinityGroupPolicyHelper() {
        return this.antiAffinityGroupPolicyHelper;
    }

    public IsolationPoliciesHelper getIsolationPoliciesHelper() {
        return this.isolationPoliciesHelper;
    }

    public LeaderElectionService getLeaderElectionService() {
        return this.leaderElectionService;
    }

    public LoadManagerContext getContext() {
        return this.context;
    }

    public BrokerSelectionStrategy getBrokerSelectionStrategy() {
        return this.brokerSelectionStrategy;
    }

    public List<BrokerFilter> getBrokerFilterPipeline() {
        return this.brokerFilterPipeline;
    }

    public UnloadCounter getUnloadCounter() {
        return this.unloadCounter;
    }

    public AtomicLong getIgnoredSendMsgCounter() {
        return this.ignoredSendMsgCounter;
    }
}
