package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.BundlesQuotas;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.TopicEventsListener;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.validator.BindAddressValidator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.BindAddress;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/BrokerService.class */
public class BrokerService implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
    private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60);
    private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class, "futureWithDeadline(...)");
    private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class, "futureWithDeadline(...)");
    private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000;
    private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
    private final PulsarService pulsar;
    private final ManagedLedgerFactory managedLedgerFactory;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private final OrderedExecutor topicOrderedExecutor;
    private AuthorizationService authorizationService;
    private final ScheduledExecutorService backlogQuotaChecker;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
    private final ObserverGauge pendingLookupRequests;
    private final ObserverGauge pendingTopicLoadRequests;
    private final ScheduledExecutorService inactivityMonitor;
    private final ScheduledExecutorService messageExpiryMonitor;
    private final ScheduledExecutorService compactionMonitor;
    private final ScheduledExecutorService consumedLedgersMonitor;
    private ScheduledExecutorService deduplicationSnapshotMonitor;
    protected final PublishRateLimiter brokerPublishRateLimiter;
    private DistributedIdGenerator producerNameGenerator;
    public static final String PRODUCER_NAME_GENERATOR_PATH = "/counters/producer-name";
    private final BacklogQuotaManager backlogQuotaManager;
    private final int keepAliveIntervalSeconds;
    private final PulsarStats pulsarStats;
    private final AuthenticationService authenticationService;
    public static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
    private final int maxUnackedMessages;
    public final int maxUnackedMsgsPerDispatcher;
    private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;

    @VisibleForTesting
    private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
    private final ServerBootstrap defaultServerBootstrap;
    private final BundlesQuotas bundlesQuotas;
    private Channel listenChannel;
    private Channel listenChannelTls;
    private boolean preciseTopicPublishRateLimitingEnable;
    private BrokerInterceptor interceptor;
    private final EntryFilterProvider entryFilterProvider;
    private TopicFactory topicFactory;
    private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
    private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
    private int numberOfNamespaceBundles = 0;
    protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
    private final LongAdder totalUnackedMessages = new LongAdder();
    private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList();
    private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
    private final List<Channel> listenChannels = new ArrayList(2);
    private final LongAdder pausedConnections = new LongAdder();
    private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher();
    private volatile boolean unloaded = false;
    private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = prepareDynamicConfigurationMap();
    private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<String, PulsarClient> replicationClients = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<String, java.util.function.Consumer<?>> configRegisteredListeners = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> multiLayerTopicsMap = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> owningTopics = ConcurrentOpenHashMap.newBuilder().build();
    private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache = ConcurrentOpenHashMap.newBuilder().build();
    private final ScheduledExecutorService statsUpdater = OrderedScheduler.newSchedulerBuilder().name("pulsar-stats-updater").numThreads(1).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/BrokerService$ConfigField.class */
    public static class ConfigField {
        final Field field;
        volatile String lastDynamicValue;
        final Object defaultValue;
        Predicate<String> validator;

        public ConfigField(Field field, Object obj) {
            this.field = field;
            this.defaultValue = obj;
        }

        public static ConfigField newCustomConfigField(String str) {
            ConfigField configField = new ConfigField(null, null);
            configField.lastDynamicValue = str;
            return configField;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/BrokerService$TopicLoadingContext.class */
    public static class TopicLoadingContext {
        private final String topic;
        private final boolean createIfMissing;
        private final CompletableFuture<Optional<Topic>> topicFuture;
        private final Map<String, String> properties;
        private final TopicPolicies topicPolicies;

        public TopicLoadingContext(String str, boolean z, CompletableFuture<Optional<Topic>> completableFuture, Map<String, String> map, TopicPolicies topicPolicies) {
            this.topic = str;
            this.createIfMissing = z;
            this.topicFuture = completableFuture;
            this.properties = map;
            this.topicPolicies = topicPolicies;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isCreateIfMissing() {
            return this.createIfMissing;
        }

        public CompletableFuture<Optional<Topic>> getTopicFuture() {
            return this.topicFuture;
        }

        public Map<String, String> getProperties() {
            return this.properties;
        }

        public TopicPolicies getTopicPolicies() {
            return this.topicPolicies;
        }
    }

    public BrokerService(PulsarService pulsarService, EventLoopGroup eventLoopGroup) throws Exception {
        this.pulsar = pulsarService;
        this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsarService.getMonotonicSnapshotClock());
        this.preciseTopicPublishRateLimitingEnable = pulsarService.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
        this.managedLedgerFactory = pulsarService.getManagedLedgerFactory();
        this.keepAliveIntervalSeconds = pulsarService.getConfiguration().getKeepAliveIntervalSeconds();
        this.pulsarStats = new PulsarStats(pulsarService);
        this.topicOrderedExecutor = OrderedExecutor.newBuilder().numThreads(pulsarService.getConfiguration().getTopicOrderedExecutorThreadNum()).name("broker-topic-workers").build();
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(pulsarService.getConfiguration().getNumAcceptorThreads(), false, new ExecutorProvider.ExtendedThreadFactory("pulsar-acceptor"));
        this.workerGroup = eventLoopGroup;
        this.authorizationService = new AuthorizationService(pulsarService.getConfiguration(), pulsar().getPulsarResources());
        this.entryFilterProvider = new EntryFilterProvider(pulsarService.getConfiguration());
        pulsarService.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
        if (pulsarService.getConfigurationMetadataStore() != pulsarService.getLocalMetadataStore()) {
            pulsarService.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
        }
        this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder().name("pulsar-inactivity-monitor").numThreads(1).build();
        this.messageExpiryMonitor = OrderedScheduler.newSchedulerBuilder().name("pulsar-msg-expiry-monitor").numThreads(1).build();
        this.compactionMonitor = OrderedScheduler.newSchedulerBuilder().name("pulsar-compaction-monitor").numThreads(1).build();
        this.consumedLedgersMonitor = OrderedScheduler.newSchedulerBuilder().name("pulsar-consumed-ledgers-monitor").numThreads(1).build();
        this.backlogQuotaManager = new BacklogQuotaManager(pulsarService);
        this.backlogQuotaChecker = OrderedScheduler.newSchedulerBuilder().name("pulsar-backlog-quota-checker").numThreads(1).build();
        this.authenticationService = new AuthenticationService(pulsarService.getConfiguration());
        this.blockedDispatchers = ConcurrentOpenHashSet.newBuilder().build();
        this.topicFactory = createPersistentTopicFactory();
        updateConfigurationAndRegisterListeners();
        this.lookupRequestSemaphore = new AtomicReference<>(new Semaphore(pulsarService.getConfiguration().getMaxConcurrentLookupRequest(), false));
        this.topicLoadRequestSemaphore = new AtomicReference<>(new Semaphore(pulsarService.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
        if (pulsarService.getConfiguration().getMaxUnackedMessagesPerBroker() <= 0 || pulsarService.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() <= 0.0d) {
            this.maxUnackedMessages = 0;
            this.maxUnackedMsgsPerDispatcher = 0;
            log.info("Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", Double.valueOf(pulsarService.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()));
        } else {
            this.maxUnackedMessages = pulsarService.getConfiguration().getMaxUnackedMessagesPerBroker();
            this.maxUnackedMsgsPerDispatcher = (int) (this.maxUnackedMessages * pulsarService.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
            log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker", Integer.valueOf(this.maxUnackedMessages), Integer.valueOf(this.maxUnackedMsgsPerDispatcher));
            pulsarService.getExecutor().scheduleAtFixedRate(this::checkUnAckMessageDispatching, 600L, 30L, TimeUnit.SECONDS);
        }
        this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader.loadDelayedDeliveryTrackerFactory(pulsarService);
        this.defaultServerBootstrap = defaultServerBootstrap();
        this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-").supplier(() -> {
            return Integer.valueOf(pulsarService.getConfig().getMaxConcurrentLookupRequest() - this.lookupRequestSemaphore.get().availablePermits());
        }).m676register();
        this.pendingTopicLoadRequests = ObserverGauge.build("pulsar_broker_topic_load_pending_requests", "-").supplier(() -> {
            return Integer.valueOf(pulsarService.getConfig().getMaxConcurrentTopicLoadRequest() - this.topicLoadRequestSemaphore.get().availablePermits());
        }).m676register();
        this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(pulsarService.getConfiguration().getBrokerEntryMetadataInterceptors(), BrokerService.class.getClassLoader());
        this.brokerEntryPayloadProcessors = BrokerEntryMetadataUtils.loadInterceptors(pulsarService.getConfiguration().getBrokerEntryPayloadProcessors(), BrokerService.class.getClassLoader());
        this.bundlesQuotas = new BundlesQuotas(pulsarService);
    }

    public void addTopicEventListener(TopicEventsListener... topicEventsListenerArr) {
        this.topicEventsDispatcher.addTopicEventListener(topicEventsListenerArr);
        getTopics().keys().forEach(str -> {
            TopicEventsDispatcher.notify(topicEventsListenerArr, str, TopicEventsListener.TopicEvent.LOAD, TopicEventsListener.EventStage.SUCCESS, (Throwable) null);
        });
    }

    public void removeTopicEventListener(TopicEventsListener... topicEventsListenerArr) {
        this.topicEventsDispatcher.removeTopicEventListener(topicEventsListenerArr);
    }

    public void startProtocolHandlers(Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> map) {
        map.forEach((str, map2) -> {
            map2.forEach((inetSocketAddress, channelInitializer) -> {
                try {
                    startProtocolHandler(str, inetSocketAddress, channelInitializer);
                } catch (IOException e) {
                    log.error("{}", e.getMessage(), e.getCause());
                    throw new RuntimeException(e.getMessage(), e.getCause());
                }
            });
        });
    }

    private void startProtocolHandler(String str, SocketAddress socketAddress, ChannelInitializer<SocketChannel> channelInitializer) throws IOException {
        ServerBootstrap clone;
        ServiceConfiguration configuration = this.pulsar.getConfiguration();
        if (configuration.isUseSeparateThreadPoolForProtocolHandlers()) {
            clone = new ServerBootstrap();
            clone.option(ChannelOption.SO_REUSEADDR, true);
            clone.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
            clone.childOption(ChannelOption.TCP_NODELAY, true);
            clone.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16384, LoadManagerShared.MIBI));
            EventLoopUtil.enableTriggeredMode(clone);
            EventLoopGroup newEventLoopGroup = EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, new ExecutorProvider.ExtendedThreadFactory("pulsar-ph-" + str));
            clone.channel(EventLoopUtil.getServerSocketChannelClass(newEventLoopGroup));
            this.protocolHandlersWorkerGroups.add(newEventLoopGroup);
            clone.group(this.acceptorGroup, newEventLoopGroup);
        } else {
            clone = this.defaultServerBootstrap.clone();
        }
        clone.childHandler(channelInitializer);
        try {
            clone.bind(socketAddress).sync();
            log.info("Successfully bind protocol `{}` on {}", str, socketAddress);
        } catch (Exception e) {
            throw new IOException("Failed to bind protocol `" + str + "` on " + socketAddress, e);
        }
    }

    private ServerBootstrap defaultServerBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        serverBootstrap.group(this.acceptorGroup, this.workerGroup);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16384, LoadManagerShared.MIBI));
        serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(this.workerGroup));
        EventLoopUtil.enableTriggeredMode(serverBootstrap);
        return serverBootstrap;
    }

    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle namespaceBundle) {
        ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) ((ConcurrentOpenHashMap) getMultiLayerTopicMap().computeIfAbsent(namespaceBundle.getNamespaceObject().toString(), str -> {
            return ConcurrentOpenHashMap.newBuilder().build();
        })).computeIfAbsent(namespaceBundle.toString(), str2 -> {
            return ConcurrentOpenHashMap.newBuilder().build();
        });
        HashMap hashMap = new HashMap();
        concurrentOpenHashMap.forEach((str3, topic) -> {
            hashMap.put(str3, topic.mo308getStats(false, false, false));
        });
        return hashMap;
    }

    public void start() throws Exception {
        this.producerNameGenerator = new DistributedIdGenerator(this.pulsar.getCoordinationService(), PRODUCER_NAME_GENERATOR_PATH, this.pulsar.getConfiguration().getClusterName());
        ServiceConfiguration configuration = this.pulsar.getConfiguration();
        List<BindAddress> validateBindAddresses = BindAddressValidator.validateBindAddresses(configuration, Arrays.asList("pulsar", "pulsar+ssl"));
        String internalListenerName = configuration.getInternalListenerName();
        if (validateBindAddresses.size() == 0) {
            throw new IllegalArgumentException("At least one broker bind address must be configured");
        }
        for (BindAddress bindAddress : validateBindAddresses) {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress.getAddress().getHost(), bindAddress.getAddress().getPort());
            boolean equals = "pulsar+ssl".equals(bindAddress.getAddress().getScheme());
            PulsarChannelInitializer.PulsarChannelOptions build = PulsarChannelInitializer.PulsarChannelOptions.builder().enableTLS(equals).listenerName(bindAddress.getListenerName()).build();
            ServerBootstrap clone = this.defaultServerBootstrap.clone();
            clone.childHandler(this.pulsarChannelInitFactory.newPulsarChannelInitializer(this.pulsar, build));
            try {
                Channel channel = clone.bind(inetSocketAddress).sync().channel();
                this.listenChannels.add(channel);
                if (StringUtils.isBlank(bindAddress.getListenerName()) || StringUtils.equalsIgnoreCase(bindAddress.getListenerName(), internalListenerName)) {
                    if (this.listenChannel == null && !equals) {
                        this.listenChannel = channel;
                    }
                    if (this.listenChannelTls == null && equals) {
                        this.listenChannelTls = channel;
                    }
                }
                Logger logger = log;
                Object[] objArr = new Object[3];
                objArr[0] = channel.localAddress();
                objArr[1] = equals ? SslContext.defaultServerProvider().toString() : "(none)";
                objArr[2] = StringUtils.defaultString(bindAddress.getListenerName(), "(none)");
                logger.info("Started Pulsar Broker service on {}, TLS: {}, listener: {}", objArr);
            } catch (Exception e) {
                throw new IOException("Failed to bind Pulsar broker on " + inetSocketAddress, e);
            }
        }
        startStatsUpdater(configuration.getStatsUpdateInitialDelayInSecs(), configuration.getStatsUpdateFrequencyInSecs());
        startInactivityMonitor();
        startMessageExpiryMonitor();
        startCompactionMonitor();
        startConsumedLedgersMonitor();
        startBacklogQuotaChecker();
        updateBrokerPublisherThrottlingMaxRate();
        updateBrokerDispatchThrottlingMaxRate();
        startCheckReplicationPolicies();
        startDeduplicationSnapshotMonitor();
    }

    protected void startStatsUpdater(int i, int i2) {
        this.statsUpdater.scheduleAtFixedRate(this::updateRates, i, i2, TimeUnit.SECONDS);
        updateRates();
    }

    protected void startDeduplicationSnapshotMonitor() {
        int brokerDeduplicationSnapshotFrequencyInSeconds = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
        if (brokerDeduplicationSnapshotFrequencyInSeconds > 0) {
            this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder().name("deduplication-snapshot-monitor").numThreads(1).build();
            this.deduplicationSnapshotMonitor.scheduleAtFixedRate(() -> {
                forEachTopic((v0) -> {
                    v0.checkDeduplicationSnapshot();
                });
            }, brokerDeduplicationSnapshotFrequencyInSeconds, brokerDeduplicationSnapshotFrequencyInSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startInactivityMonitor() {
        if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
            int brokerDeleteInactiveTopicsFrequencySeconds = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
            this.inactivityMonitor.scheduleAtFixedRate(() -> {
                checkGC();
            }, brokerDeleteInactiveTopicsFrequencySeconds, brokerDeleteInactiveTopicsFrequencySeconds, TimeUnit.SECONDS);
        }
        long seconds = TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3;
        this.inactivityMonitor.scheduleAtFixedRate(this::checkMessageDeduplicationInfo, seconds, seconds, TimeUnit.SECONDS);
        if (pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0) {
            long seconds2 = TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes());
            this.inactivityMonitor.scheduleAtFixedRate(this::checkInactiveSubscriptions, seconds2, seconds2, TimeUnit.SECONDS);
        }
        int clusterMigrationCheckDurationSeconds = pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
        if (clusterMigrationCheckDurationSeconds > 0) {
            this.inactivityMonitor.scheduleAtFixedRate(() -> {
                checkClusterMigration();
            }, clusterMigrationCheckDurationSeconds, clusterMigrationCheckDurationSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startMessageExpiryMonitor() {
        int messageExpiryCheckIntervalInMinutes = pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
        this.messageExpiryMonitor.scheduleAtFixedRate(this::checkMessageExpiry, messageExpiryCheckIntervalInMinutes, messageExpiryCheckIntervalInMinutes, TimeUnit.MINUTES);
    }

    protected void startCheckReplicationPolicies() {
        int replicationPolicyCheckDurationSeconds = this.pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
        if (replicationPolicyCheckDurationSeconds > 0) {
            this.messageExpiryMonitor.scheduleAtFixedRate(this::checkReplicationPolicies, replicationPolicyCheckDurationSeconds, replicationPolicyCheckDurationSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startCompactionMonitor() {
        int brokerServiceCompactionMonitorIntervalInSeconds = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
        if (brokerServiceCompactionMonitorIntervalInSeconds > 0) {
            this.compactionMonitor.scheduleAtFixedRate(this::checkCompaction, brokerServiceCompactionMonitorIntervalInSeconds, brokerServiceCompactionMonitorIntervalInSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startConsumedLedgersMonitor() {
        int retentionCheckIntervalInSeconds = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
        if (retentionCheckIntervalInSeconds > 0) {
            this.consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers, retentionCheckIntervalInSeconds, retentionCheckIntervalInSeconds, TimeUnit.SECONDS);
        }
    }

    protected void startBacklogQuotaChecker() {
        if (!pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
            log.info("Backlog quota check monitoring is disabled");
            return;
        }
        int backlogQuotaCheckIntervalInSeconds = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
        log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", Integer.valueOf(backlogQuotaCheckIntervalInSeconds));
        this.backlogQuotaChecker.scheduleAtFixedRate(this::monitorBacklogQuota, backlogQuotaCheckIntervalInSeconds, backlogQuotaCheckIntervalInSeconds, TimeUnit.SECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            closeAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof IOException)) {
                throw new PulsarServerException(e2.getCause());
            }
            throw ((IOException) e2.getCause());
        }
    }

    public CompletableFuture<Void> closeAndRemoveReplicationClient(String str) {
        ArrayList arrayList = new ArrayList((int) this.topics.size());
        this.topics.forEach((str2, completableFuture) -> {
            CompletableFuture completableFuture = new CompletableFuture();
            arrayList.add(completableFuture);
            completableFuture.whenComplete((optional, th) -> {
                Replicator replicator;
                if (optional.isPresent() && (replicator = (Replicator) ((Topic) optional.get()).getReplicators().get(str)) != null && replicator.isConnected()) {
                    replicator.terminate().whenComplete((r4, th) -> {
                        completableFuture.complete(null);
                    });
                } else {
                    completableFuture.complete(null);
                }
            });
        });
        return FutureUtil.waitForAll(arrayList).thenCompose(r5 -> {
            PulsarClient pulsarClient = (PulsarClient) this.replicationClients.remove(str);
            return pulsarClient == null ? CompletableFuture.completedFuture(null) : pulsarClient.closeAsync();
        });
    }

    public CompletableFuture<Void> closeAsync() {
        try {
            log.info("Shutting down Pulsar Broker service");
            unloadNamespaceBundlesGracefully();
            this.replicationClients.forEach((str, pulsarClient) -> {
                try {
                    pulsarClient.shutdown();
                } catch (Exception e) {
                    log.warn("Error shutting down repl client for cluster {}", str, e);
                }
            });
            this.clusterAdmins.forEach((str2, pulsarAdmin) -> {
                try {
                    pulsarAdmin.close();
                } catch (Exception e) {
                    log.warn("Error shutting down repl admin for cluster {}", str2, e);
                }
            });
            if (this.entryFilterProvider != null) {
                this.entryFilterProvider.close();
            }
            CompletableFuture completableFuture = new CompletableFuture();
            log.info("Event loops shutting down gracefully...");
            ArrayList arrayList = new ArrayList();
            arrayList.add(shutdownEventLoopGracefully(this.acceptorGroup));
            arrayList.add(shutdownEventLoopGracefully(this.workerGroup));
            Iterator<EventLoopGroup> it = this.protocolHandlersWorkerGroups.iterator();
            while (it.hasNext()) {
                arrayList.add(shutdownEventLoopGracefully(it.next()));
            }
            CompletableFuture<Void> thenComposeAsync = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).handle((r4, th) -> {
                if (th != null) {
                    log.warn("Error shutting down event loops gracefully", th);
                    return null;
                }
                log.info("Event loops shutdown completed.");
                return null;
            }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) obj -> {
                log.info("Continuing to second phase in shutdown.");
                ArrayList arrayList2 = new ArrayList();
                this.listenChannels.forEach(channel -> {
                    if (channel.isOpen()) {
                        arrayList2.add(closeChannel(channel));
                    }
                });
                if (this.interceptor != null) {
                    this.interceptor.close();
                    this.interceptor = null;
                }
                try {
                    this.authenticationService.close();
                } catch (IOException e) {
                    log.warn("Error in closing authenticationService", e);
                }
                this.pulsarStats.close();
                try {
                    this.delayedDeliveryTrackerFactory.close();
                } catch (Exception e2) {
                    log.warn("Error in closing delayedDeliveryTrackerFactory", e2);
                }
                arrayList2.add(GracefulExecutorServicesShutdown.initiate().timeout(Duration.ofMillis((long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * this.pulsar.getConfiguration().getBrokerShutdownTimeoutMs()))).shutdown(new ExecutorService[]{this.statsUpdater, this.inactivityMonitor, this.messageExpiryMonitor, this.compactionMonitor, this.consumedLedgersMonitor, this.backlogQuotaChecker, this.topicOrderedExecutor, this.deduplicationSnapshotMonitor}).handle());
                CompletableFuture waitForAllAndSupportCancel = FutureUtil.waitForAllAndSupportCancel(arrayList2);
                completableFuture.complete(waitForAllAndSupportCancel);
                waitForAllAndSupportCancel.handle((r42, th2) -> {
                    if (th2 == null) {
                        log.info("Broker service completely shut down");
                        return null;
                    }
                    if (th2 instanceof CancellationException) {
                        log.warn("Broker service didn't complete gracefully. Terminating Broker service.");
                        return null;
                    }
                    log.warn("Broker service shut down completed with exception", th2);
                    return null;
                });
                return waitForAllAndSupportCancel;
            }, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("BrokerService-shutdown-phase2");
                thread.setDaemon(false);
                thread.start();
            });
            FutureUtil.whenCancelledOrTimedOut(thenComposeAsync, () -> {
                completableFuture.thenAccept(completableFuture2 -> {
                    completableFuture2.cancel(false);
                });
            });
            return thenComposeAsync;
        } catch (Exception e) {
            return FutureUtil.failedFuture(e);
        }
    }

    CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) {
        long brokerShutdownTimeoutMs = this.pulsar.getConfiguration().getBrokerShutdownTimeoutMs();
        return NettyFutureUtil.toCompletableFutureVoid(eventLoopGroup.shutdownGracefully(Math.min((long) (GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs), GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS), (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs), TimeUnit.MILLISECONDS));
    }

    private CompletableFuture<Void> closeChannel(Channel channel) {
        return ChannelFutures.toCompletableFuture(channel.close()).handle((channel2, th) -> {
            if (th == null || (th instanceof RejectedExecutionException)) {
                return null;
            }
            log.warn("Cannot close channel {}", channel, th);
            return null;
        });
    }

    public void unloadNamespaceBundlesGracefully() {
        unloadNamespaceBundlesGracefully(0, true);
    }

    public void unloadNamespaceBundlesGracefully(int i, boolean z) {
        if (this.unloaded) {
            return;
        }
        try {
            try {
                log.info("Unloading namespace-bundles...");
                long nanoTime = System.nanoTime();
                if (this.pulsar.getLoadManager() != null && this.pulsar.getLoadManager().get() != null) {
                    try {
                        this.pulsar.getLoadManager().get().disableBroker();
                    } catch (PulsarServerException.NotFoundException e) {
                        log.warn("Broker load-manager znode doesn't exist ", e);
                    }
                }
                log.info("Disable broker in load manager completed in {} seconds", Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) / 1000.0d));
                long nanoTime2 = System.nanoTime();
                Set<NamespaceBundle> ownedServiceUnits = this.pulsar.getNamespaceService() != null ? this.pulsar.getNamespaceService().getOwnedServiceUnits() : null;
                if (ownedServiceUnits != null) {
                    RateLimiter create = i > 0 ? RateLimiter.create(i) : null;
                    ownedServiceUnits.forEach(namespaceBundle -> {
                        if (namespaceBundle != null) {
                            if (create != null) {
                                try {
                                    create.acquire(1);
                                } catch (Exception e2) {
                                    log.warn("Failed to unload namespace bundle {}", namespaceBundle, e2);
                                    return;
                                }
                            }
                            long namespaceBundleUnloadingTimeoutMs = this.pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs();
                            this.pulsar.getNamespaceService().unloadNamespaceBundle(namespaceBundle, namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS, z).get(namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS);
                        }
                    });
                    log.info("Unloading {} namespace-bundles completed in {} seconds", Integer.valueOf(ownedServiceUnits.size()), Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2) / 1000.0d));
                }
            } catch (Exception e2) {
                log.error("Failed to disable broker from loadbalancer list {}", e2.getMessage(), e2);
                this.unloaded = true;
            }
        } finally {
            this.unloaded = true;
        }
    }

    public CompletableFuture<Optional<Topic>> getTopicIfExists(String str) {
        return getTopic(str, false);
    }

    public CompletableFuture<Topic> getOrCreateTopic(String str) {
        return isAllowAutoTopicCreationAsync(str).thenCompose(bool -> {
            return getTopic(str, bool.booleanValue());
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.get();
        });
    }

    public CompletableFuture<Optional<Topic>> getTopic(String str, boolean z) {
        return getTopic(str, z, (Map<String, String>) null);
    }

    public CompletableFuture<Optional<Topic>> getTopic(String str, boolean z, Map<String, String> map) {
        return getTopic(TopicName.get(str), z, map);
    }

    public CompletableFuture<Optional<Topic>> getTopic(TopicName topicName, boolean z, Map<String, String> map) {
        try {
            CompletableFuture<Optional<Topic>> completableFuture = (CompletableFuture) this.topics.get(topicName.toString());
            if (completableFuture != null) {
                if (!completableFuture.isCompletedExceptionally() && (!completableFuture.isDone() || completableFuture.getNow(Optional.empty()).isPresent())) {
                    return z ? (completableFuture.isDone() && completableFuture.getNow(Optional.empty()).isPresent()) ? completableFuture : completableFuture.thenCompose(optional -> {
                        return !optional.isPresent() ? getTopic(topicName, z, (Map<String, String>) map) : CompletableFuture.completedFuture(optional);
                    }) : completableFuture;
                }
                this.topics.remove(topicName.toString(), completableFuture);
            }
            return topicName.getDomain().equals(TopicDomain.persistent) ? this.pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName).thenCompose(bool -> {
                return (bool.booleanValue() || z) ? getTopicPoliciesBypassSystemTopic(topicName).exceptionally(th -> {
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    String format = String.format("Topic creation encountered an exception by initialize topic policies service. topic_name=%s error_message=%s", topicName, unwrapCompletionException.getMessage());
                    log.error(format, unwrapCompletionException);
                    throw FutureUtil.wrapToCompletionException(new BrokerServiceException.ServiceUnitNotReadyException(format));
                }).thenCompose(optional2 -> {
                    TopicPolicies topicPolicies = (TopicPolicies) optional2.orElse(null);
                    return ((CompletableFuture) this.topics.computeIfAbsent(topicName.toString(), str -> {
                        return topicName.isPartitioned() ? fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())).thenCompose(partitionedTopicMetadata -> {
                            if (partitionedTopicMetadata.partitions == 0 || topicName.getPartitionIndex() < partitionedTopicMetadata.partitions) {
                                return loadOrCreatePersistentTopic(str, z, map, topicPolicies);
                            }
                            String format = String.format("Illegal topic partition name %s with max allowed %d partitions", topicName, Integer.valueOf(partitionedTopicMetadata.partitions));
                            log.warn(format);
                            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException(format));
                        }) : loadOrCreatePersistentTopic(str, z, map, topicPolicies);
                    })).thenCompose(optional2 -> {
                        if (optional2.isPresent() || !z) {
                            return CompletableFuture.completedFuture(optional2);
                        }
                        log.warn("[{}] Try to recreate the topic with createIfMissing=true but the returned topic is empty", topicName);
                        return getTopic(topicName, z, (Map<String, String>) map);
                    });
                }) : CompletableFuture.completedFuture(Optional.empty());
            }) : (CompletableFuture) this.topics.computeIfAbsent(topicName.toString(), str -> {
                this.topicEventsDispatcher.notify(topicName.toString(), TopicEventsListener.TopicEvent.LOAD, TopicEventsListener.EventStage.BEFORE);
                if (topicName.isPartitioned()) {
                    return fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())).thenCompose(partitionedTopicMetadata -> {
                        if (topicName.getPartitionIndex() >= partitionedTopicMetadata.partitions) {
                            this.topicEventsDispatcher.notify(topicName.toString(), TopicEventsListener.TopicEvent.LOAD, TopicEventsListener.EventStage.FAILURE);
                            return CompletableFuture.completedFuture(Optional.empty());
                        }
                        this.topicEventsDispatcher.notify(topicName.toString(), TopicEventsListener.TopicEvent.CREATE, TopicEventsListener.EventStage.BEFORE);
                        CompletableFuture<Optional<Topic>> createNonPersistentTopic = createNonPersistentTopic(str);
                        this.topicEventsDispatcher.notifyOnCompletion(this.topicEventsDispatcher.notifyOnCompletion(createNonPersistentTopic, topicName.toString(), TopicEventsListener.TopicEvent.CREATE), topicName.toString(), TopicEventsListener.TopicEvent.LOAD);
                        return createNonPersistentTopic;
                    });
                }
                if (!z) {
                    this.topicEventsDispatcher.notify(topicName.toString(), TopicEventsListener.TopicEvent.LOAD, TopicEventsListener.EventStage.FAILURE);
                    return CompletableFuture.completedFuture(Optional.empty());
                }
                this.topicEventsDispatcher.notify(topicName.toString(), TopicEventsListener.TopicEvent.CREATE, TopicEventsListener.EventStage.BEFORE);
                CompletableFuture<Optional<Topic>> createNonPersistentTopic = createNonPersistentTopic(str);
                this.topicEventsDispatcher.notifyOnCompletion(this.topicEventsDispatcher.notifyOnCompletion(createNonPersistentTopic, topicName.toString(), TopicEventsListener.TopicEvent.CREATE), topicName.toString(), TopicEventsListener.TopicEvent.LOAD);
                return createNonPersistentTopic;
            });
        } catch (IllegalArgumentException e) {
            log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
            return FutureUtil.failedFuture(e);
        } catch (RuntimeException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                log.warn("[{}] Service unit is not ready when loading the topic", topicName);
            } else {
                log.warn("[{}] Unexpected exception when loading topic: {}", new Object[]{topicName, e2.getMessage(), e2});
            }
            return FutureUtil.failedFuture(cause);
        }
    }

    private CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesBypassSystemTopic(@Nonnull TopicName topicName) {
        Objects.requireNonNull(topicName);
        ServiceConfiguration configuration = this.pulsar.getConfiguration();
        return (!configuration.isSystemTopicEnabled() || !configuration.isTopicLevelPoliciesEnabled() || NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) || SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) ? CompletableFuture.completedFuture(Optional.empty()) : this.pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName);
    }

    public CompletableFuture<Void> deleteTopic(String str, boolean z) {
        this.topicEventsDispatcher.notify(str, TopicEventsListener.TopicEvent.DELETE, TopicEventsListener.EventStage.BEFORE);
        CompletableFuture<Void> deleteTopicInternal = deleteTopicInternal(str, z);
        this.topicEventsDispatcher.notifyOnCompletion(deleteTopicInternal, str, TopicEventsListener.TopicEvent.DELETE);
        return deleteTopicInternal;
    }

    private CompletableFuture<Void> deleteTopicInternal(String str, boolean z) {
        TopicName topicName = TopicName.get(str);
        Optional<Topic> topicReference = getTopicReference(str);
        if (!topicReference.isPresent()) {
            log.info("Topic {} is not loaded, try to delete from metadata", str);
            TopicName topicName2 = TopicName.get(str);
            if (!topicName2.isPersistent()) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
            deleteTopicAuthenticationWithRetry(str, completableFuture2, 5);
            completableFuture2.thenCompose(r5 -> {
                return deleteSchema(topicName2);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) schemaVersion -> {
                return (SystemTopicNames.isTopicPoliciesSystemTopic(str) || !getPulsar().getConfiguration().isSystemTopicEnabled()) ? CompletableFuture.completedFuture(null) : deleteTopicPolicies(topicName2);
            }).whenComplete((r12, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    this.managedLedgerFactory.asyncDelete(topicName2.getPersistenceNamingEncoding(), getManagedLedgerConfig(topicName), new AsyncCallbacks.DeleteLedgerCallback() { // from class: org.apache.pulsar.broker.service.BrokerService.1
                        public void deleteLedgerComplete(Object obj) {
                            completableFuture.complete(null);
                        }

                        public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                            completableFuture.completeExceptionally(managedLedgerException);
                        }
                    }, (Object) null);
                }
            });
            return completableFuture;
        }
        Topic topic = topicReference.get();
        if (z) {
            return topic.deleteForcefully();
        }
        if (topic.isReplicated()) {
            List keys = topic.getReplicators().keys();
            log.error("Delete forbidden topic {} is replicated on clusters {}", str, keys);
            return FutureUtil.failedFuture(new IllegalStateException("Delete forbidden topic is replicated on clusters " + keys));
        }
        if (!topic.isShadowReplicated()) {
            return topic.delete();
        }
        log.error("Delete forbidden. Topic {} is replicated to shadow topics: {}", str, topic.getShadowReplicators().keys());
        return FutureUtil.failedFuture(new IllegalStateException("Delete forbidden. Topic " + str + " is replicated to shadow topics."));
    }

    public void deleteTopicAuthenticationWithRetry(String str, CompletableFuture<Void> completableFuture, int i) {
        if (i != 0) {
            this.authorizationService.removePermissionsAsync(TopicName.get(str)).thenAccept(r6 -> {
                log.info("Successfully delete authentication policies for topic {}", str);
                completableFuture.complete(null);
            }).exceptionally(th -> {
                if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                    log.warn("Failed to delete authentication policies because of bad version. Retry to delete authentication policies for topic {}", str);
                    deleteTopicAuthenticationWithRetry(str, completableFuture, i - 1);
                    return null;
                }
                log.error("Failed to delete authentication policies for topic {}", str, th);
                completableFuture.completeExceptionally(th);
                return null;
            });
        } else {
            log.error("The number of retries has exhausted for topic {}", str);
            completableFuture.completeExceptionally(new MetadataStoreException("The number of retries has exhausted"));
        }
    }

    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String str) {
        CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
        completableFuture.exceptionally(th -> {
            this.pulsarStats.recordTopicLoadFailed();
            return null;
        });
        if (!this.pulsar.getConfiguration().isEnableNonPersistentTopics()) {
            if (log.isDebugEnabled()) {
                log.debug("Broker is unable to load non-persistent topic {}", str);
            }
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Broker is not unable to load non-persistent topic"));
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        try {
            NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) newTopic(str, null, this, NonPersistentTopic.class);
            checkTopicNsOwnership(str).thenRun(() -> {
                nonPersistentTopic.initialize().thenCompose(r3 -> {
                    return nonPersistentTopic.checkReplication();
                }).thenRun(() -> {
                    log.info("Created topic {}", nonPersistentTopic);
                    this.pulsarStats.recordTopicLoadTimeValue(str, TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - millis);
                    addTopicToStatsMaps(TopicName.get(str), nonPersistentTopic);
                    completableFuture.complete(Optional.of(nonPersistentTopic));
                }).exceptionally(th2 -> {
                    log.warn("Replication check failed. Removing topic from topics list {}, {}", str, th2.getCause());
                    nonPersistentTopic.stopReplProducers().whenComplete((r9, th2) -> {
                        this.pulsar.getExecutor().execute(() -> {
                            this.topics.remove(str, completableFuture);
                        });
                        completableFuture.completeExceptionally(th2);
                    });
                    return null;
                });
            }).exceptionally(th2 -> {
                log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", str, th2.getCause());
                completableFuture.complete(Optional.of(nonPersistentTopic));
                this.pulsar.getExecutor().execute(() -> {
                    this.topics.remove(str, completableFuture);
                });
                return null;
            });
            return completableFuture;
        } catch (Throwable th3) {
            log.warn("Failed to create topic {}", str, th3);
            completableFuture.completeExceptionally(th3);
            return completableFuture;
        }
    }

    public PulsarClient getReplicationClient(String str, Optional<ClusterData> optional) {
        PulsarClient pulsarClient = (PulsarClient) this.replicationClients.get(str);
        return pulsarClient != null ? pulsarClient : (PulsarClient) this.replicationClients.computeIfAbsent(str, str2 -> {
            try {
                ClusterData clusterData = (ClusterData) optional.orElseThrow(() -> {
                    return new MetadataStoreException.NotFoundException(str);
                });
                ClientBuilderImpl statsInterval = PulsarClient.builder().enableTcpNoDelay(false).connectionsPerBroker(this.pulsar.getConfiguration().getReplicationConnectionsPerBroker()).statsInterval(0L, TimeUnit.SECONDS);
                statsInterval.memoryLimit(0L, SizeUnit.BYTES);
                statsInterval.loadConf(PropertiesUtils.filterAndMapProperties(this.pulsar.getConfiguration().getProperties(), "brokerClient_"));
                statsInterval.connectionMaxIdleSeconds(-1);
                if (clusterData.getAuthenticationPlugin() != null && clusterData.getAuthenticationParameters() != null) {
                    statsInterval.authentication(clusterData.getAuthenticationPlugin(), clusterData.getAuthenticationParameters());
                } else if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    statsInterval.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                String brokerServiceUrlTls = StringUtils.isNotBlank(clusterData.getBrokerServiceUrlTls()) ? clusterData.getBrokerServiceUrlTls() : clusterData.getServiceUrlTls();
                if (clusterData.isBrokerClientTlsEnabled()) {
                    configTlsSettings(statsInterval, brokerServiceUrlTls, clusterData.isBrokerClientTlsEnabledWithKeyStore(), clusterData.isTlsAllowInsecureConnection(), clusterData.getBrokerClientTlsTrustStoreType(), clusterData.getBrokerClientTlsTrustStore(), clusterData.getBrokerClientTlsTrustStorePassword(), clusterData.getBrokerClientTlsKeyStoreType(), clusterData.getBrokerClientTlsKeyStore(), clusterData.getBrokerClientTlsKeyStorePassword(), clusterData.getBrokerClientTrustCertsFilePath(), clusterData.getBrokerClientKeyFilePath(), clusterData.getBrokerClientCertificateFilePath(), this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                } else if (this.pulsar.getConfiguration().isBrokerClientTlsEnabled()) {
                    configTlsSettings(statsInterval, brokerServiceUrlTls, this.pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore(), this.pulsar.getConfiguration().isTlsAllowInsecureConnection(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStoreType(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStore(), this.pulsar.getConfiguration().getBrokerClientTlsTrustStorePassword(), this.pulsar.getConfiguration().getBrokerClientTlsKeyStoreType(), this.pulsar.getConfiguration().getBrokerClientTlsKeyStore(), this.pulsar.getConfiguration().getBrokerClientTlsKeyStorePassword(), this.pulsar.getConfiguration().getBrokerClientTrustCertsFilePath(), this.pulsar.getConfiguration().getBrokerClientKeyFilePath(), this.pulsar.getConfiguration().getBrokerClientCertificateFilePath(), this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                } else {
                    statsInterval.serviceUrl(StringUtils.isNotBlank(clusterData.getBrokerServiceUrl()) ? clusterData.getBrokerServiceUrl() : clusterData.getServiceUrl());
                }
                if (clusterData.getProxyProtocol() != null && StringUtils.isNotBlank(clusterData.getProxyServiceUrl())) {
                    statsInterval.proxyServiceUrl(clusterData.getProxyServiceUrl(), clusterData.getProxyProtocol());
                    log.info("Configuring proxy-url {} with protocol {}", clusterData.getProxyServiceUrl(), clusterData.getProxyProtocol());
                }
                if (StringUtils.isNotBlank(clusterData.getListenerName())) {
                    statsInterval.listenerName(clusterData.getListenerName());
                    log.info("Configuring listenerName {}", clusterData.getListenerName());
                }
                return this.pulsar.createClientImpl(statsInterval.getClientConfigurationData());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void configTlsSettings(ClientBuilder clientBuilder, String str, boolean z, boolean z2, String str2, String str3, String str4, String str5, String str6, String str7, String str8, String str9, String str10, boolean z3) {
        clientBuilder.serviceUrl(str).allowTlsInsecureConnection(z2).enableTlsHostnameVerification(z3);
        if (z) {
            clientBuilder.useKeyStoreTls(true).tlsTrustStoreType(str2).tlsTrustStorePath(str3).tlsTrustStorePassword(str4).tlsKeyStoreType(str5).tlsKeyStorePath(str6).tlsKeyStorePassword(str7);
        } else {
            clientBuilder.tlsTrustCertsFilePath(str8).tlsKeyFilePath(str9).tlsCertificateFilePath(str10);
        }
    }

    private void configAdminTlsSettings(PulsarAdminBuilder pulsarAdminBuilder, boolean z, boolean z2, String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, String str9, boolean z3) {
        if (z) {
            pulsarAdminBuilder.useKeyStoreTls(true).tlsTrustStoreType(str).tlsTrustStorePath(str2).tlsTrustStorePassword(str3).tlsKeyStoreType(str4).tlsKeyStorePath(str5).tlsKeyStorePassword(str6);
        } else {
            pulsarAdminBuilder.tlsTrustCertsFilePath(str7).tlsKeyFilePath(str8).tlsCertificateFilePath(str9);
        }
        pulsarAdminBuilder.allowTlsInsecureConnection(z2).enableTlsHostnameVerification(z3);
    }

    public PulsarAdmin getClusterPulsarAdmin(String str, Optional<ClusterData> optional) {
        PulsarAdmin pulsarAdmin = (PulsarAdmin) this.clusterAdmins.get(str);
        return pulsarAdmin != null ? pulsarAdmin : (PulsarAdmin) this.clusterAdmins.computeIfAbsent(str, str2 -> {
            try {
                ClusterData clusterData = (ClusterData) optional.orElseThrow(() -> {
                    return new MetadataStoreException.NotFoundException(str);
                });
                PulsarAdminBuilder builder = PulsarAdmin.builder();
                ServiceConfiguration config = this.pulsar.getConfig();
                builder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
                if (clusterData.getAuthenticationPlugin() == null || clusterData.getAuthenticationParameters() == null) {
                    builder.authentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                } else {
                    builder.authentication(clusterData.getAuthenticationPlugin(), clusterData.getAuthenticationParameters());
                }
                boolean z = clusterData.isBrokerClientTlsEnabled() || config.isBrokerClientTlsEnabled();
                if (z && StringUtils.isEmpty(clusterData.getServiceUrlTls())) {
                    throw new IllegalArgumentException("serviceUrlTls is empty, brokerClientTlsEnabled: " + z);
                }
                if (StringUtils.isEmpty(clusterData.getServiceUrl())) {
                    throw new IllegalArgumentException("serviceUrl is empty, brokerClientTlsEnabled: " + z);
                }
                String serviceUrlTls = z ? clusterData.getServiceUrlTls() : clusterData.getServiceUrl();
                builder.serviceHttpUrl(serviceUrlTls);
                if (clusterData.isBrokerClientTlsEnabled()) {
                    configAdminTlsSettings(builder, clusterData.isBrokerClientTlsEnabledWithKeyStore(), clusterData.isTlsAllowInsecureConnection(), clusterData.getBrokerClientTlsTrustStoreType(), clusterData.getBrokerClientTlsTrustStore(), clusterData.getBrokerClientTlsTrustStorePassword(), clusterData.getBrokerClientTlsKeyStoreType(), clusterData.getBrokerClientTlsKeyStore(), clusterData.getBrokerClientTlsKeyStorePassword(), clusterData.getBrokerClientTrustCertsFilePath(), clusterData.getBrokerClientKeyFilePath(), clusterData.getBrokerClientCertificateFilePath(), this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                } else if (config.isBrokerClientTlsEnabled()) {
                    configAdminTlsSettings(builder, config.isBrokerClientTlsEnabledWithKeyStore(), config.isTlsAllowInsecureConnection(), config.getBrokerClientTlsTrustStoreType(), config.getBrokerClientTlsTrustStore(), config.getBrokerClientTlsTrustStorePassword(), config.getBrokerClientTlsKeyStoreType(), config.getBrokerClientTlsKeyStore(), config.getBrokerClientTlsKeyStorePassword(), config.getBrokerClientTrustCertsFilePath(), config.getBrokerClientKeyFilePath(), config.getBrokerClientCertificateFilePath(), this.pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
                }
                builder.readTimeout(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                PulsarAdmin build = builder.build();
                log.info("created admin with url {} ", serviceUrlTls);
                return build;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(String str, boolean z, Map<String, String> map, @Nullable TopicPolicies topicPolicies) {
        CompletableFuture<Optional<Topic>> createFutureWithTimeout = FutureUtil.createFutureWithTimeout(Duration.ofSeconds(this.pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> {
            return FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION;
        });
        if (this.pulsar.getConfiguration().isEnablePersistentTopics()) {
            checkTopicNsOwnership(str).thenRun(() -> {
                Semaphore semaphore = this.topicLoadRequestSemaphore.get();
                if (semaphore.tryAcquire()) {
                    checkOwnershipAndCreatePersistentTopic(str, z, createFutureWithTimeout, map, topicPolicies);
                    createFutureWithTimeout.handle((optional, th) -> {
                        semaphore.release();
                        if (th == null || !(th.getCause() instanceof BrokerServiceException.TopicMigratedException)) {
                            createPendingLoadTopic();
                            return null;
                        }
                        createFutureWithTimeout.completeExceptionally(th.getCause());
                        return null;
                    });
                } else {
                    this.pendingTopicLoadingQueue.add(new TopicLoadingContext(str, z, createFutureWithTimeout, map, topicPolicies));
                    if (log.isDebugEnabled()) {
                        log.debug("topic-loading for {} added into pending queue", str);
                    }
                }
            }).exceptionally(th -> {
                createFutureWithTimeout.completeExceptionally(th.getCause());
                return null;
            });
            return createFutureWithTimeout;
        }
        if (log.isDebugEnabled()) {
            log.debug("Broker is unable to load persistent topic {}", str);
        }
        createFutureWithTimeout.completeExceptionally(new BrokerServiceException.NotAllowedException("Broker is unable to load persistent topic"));
        return createFutureWithTimeout;
    }

    @VisibleForTesting
    protected CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName topicName) {
        return !topicName.isPartitioned() ? this.managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding()) : fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())).thenCompose(partitionedTopicMetadata -> {
            if (partitionedTopicMetadata.partitions == 0) {
                return this.managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
            }
            if (MapUtils.getString(partitionedTopicMetadata.properties, "PULSAR.SHADOW_SOURCE") == null) {
                return CompletableFuture.completedFuture(null);
            }
            String str = (String) partitionedTopicMetadata.properties.get("PULSAR.SHADOW_SOURCE");
            HashMap hashMap = new HashMap();
            hashMap.put("PULSAR.SHADOW_SOURCE", TopicName.getTopicPartitionNameString(str, topicName.getPartitionIndex()));
            return CompletableFuture.completedFuture(hashMap);
        });
    }

    private void checkOwnershipAndCreatePersistentTopic(String str, boolean z, CompletableFuture<Optional<Topic>> completableFuture, Map<String, String> map, @Nullable TopicPolicies topicPolicies) {
        TopicName topicName = TopicName.get(str);
        this.pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName).thenAccept(bool -> {
            if (bool.booleanValue()) {
                (map == null ? fetchTopicPropertiesAsync(topicName) : CompletableFuture.completedFuture(map)).thenAccept(map2 -> {
                    createPersistentTopic(str, z, completableFuture, map2, topicPolicies);
                }).exceptionally(th -> {
                    log.warn("[{}] Read topic property failed", str, th);
                    this.pulsar.getExecutor().execute(() -> {
                        this.topics.remove(str, completableFuture);
                    });
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return;
            }
            String format = String.format("Namespace is being unloaded, cannot add topic %s", str);
            log.warn(format);
            this.pulsar.getExecutor().execute(() -> {
                this.topics.remove(str, completableFuture);
            });
            completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    @VisibleForTesting
    public void createPersistentTopic0(String str, boolean z, CompletableFuture<Optional<Topic>> completableFuture, Map<String, String> map) {
        createPersistentTopic(str, z, completableFuture, map, null);
    }

    private void createPersistentTopic(String str, boolean z, CompletableFuture<Optional<Topic>> completableFuture, Map<String, String> map, @Nullable TopicPolicies topicPolicies) {
        TopicName topicName = TopicName.get(str);
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        completableFuture.exceptionally(th -> {
            this.pulsarStats.recordTopicLoadFailed();
            return null;
        });
        if (!SystemTopicNames.isTransactionInternalName(topicName)) {
            CompletableFuture<Void> checkMaxTopicsPerNamespace = z ? checkMaxTopicsPerNamespace(topicName, 1) : CompletableFuture.completedFuture(null);
            CompletableFuture<Void> checkTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName);
            checkMaxTopicsPerNamespace.thenCompose(r3 -> {
                return checkTopicAlreadyMigrated;
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
                return getManagedLedgerConfig(topicName, topicPolicies);
            }).thenAccept(managedLedgerConfig -> {
                if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) {
                    HashSet hashSet = new HashSet();
                    for (BrokerEntryMetadataInterceptor brokerEntryMetadataInterceptor : this.brokerEntryMetadataInterceptors) {
                        if (brokerEntryMetadataInterceptor instanceof AppendIndexMetadataInterceptor) {
                            hashSet.add(new AppendIndexMetadataInterceptor());
                        } else {
                            hashSet.add(brokerEntryMetadataInterceptor);
                        }
                    }
                    managedLedgerConfig.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(hashSet, this.brokerEntryPayloadProcessors));
                }
                managedLedgerConfig.setCreateIfMissing(z);
                managedLedgerConfig.setProperties(map);
                String shadowSource = managedLedgerConfig.getShadowSource();
                if (shadowSource != null) {
                    managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
                }
                this.topicEventsDispatcher.notify(str, TopicEventsListener.TopicEvent.LOAD, TopicEventsListener.EventStage.BEFORE);
                final CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture.whenComplete((optional, th2) -> {
                    if (th2 == null) {
                        completableFuture2.complete(null);
                    } else {
                        completableFuture2.completeExceptionally(th2);
                    }
                });
                if (z) {
                    this.topicEventsDispatcher.notify(str, TopicEventsListener.TopicEvent.CREATE, TopicEventsListener.EventStage.BEFORE);
                    this.topicEventsDispatcher.notifyOnCompletion(completableFuture, str, TopicEventsListener.TopicEvent.CREATE);
                }
                this.topicEventsDispatcher.notifyOnCompletion(completableFuture2, str, TopicEventsListener.TopicEvent.LOAD);
                this.managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new AsyncCallbacks.OpenLedgerCallback() { // from class: org.apache.pulsar.broker.service.BrokerService.2
                    public void openLedgerComplete(ManagedLedger managedLedger, Object obj) {
                        try {
                            PersistentTopic systemTopic = BrokerService.this.isSystemTopic(str) ? new SystemTopic(str, managedLedger, BrokerService.this) : (PersistentTopic) BrokerService.this.newTopic(str, managedLedger, BrokerService.this, PersistentTopic.class);
                            CompletableFuture thenCompose = systemTopic.initialize().thenCompose(r32 -> {
                                return systemTopic.preCreateSubscriptionForCompactionIfNeeded();
                            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r33 -> {
                                return systemTopic.checkReplication();
                            }).thenCompose(r34 -> {
                                return systemTopic.checkDeduplicationStatus();
                            });
                            String str2 = str;
                            long j = millis;
                            CompletableFuture completableFuture3 = completableFuture;
                            TopicName topicName2 = topicName;
                            CompletableFuture<Void> thenRun = thenCompose.thenRun(() -> {
                                BrokerService.log.info("Created topic {} - dedup is {}", str2, systemTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
                                BrokerService.this.pulsarStats.recordTopicLoadTimeValue(str2, TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - j);
                                if (completableFuture3.isCompletedExceptionally()) {
                                    BrokerService.log.warn("{} future is already completed with failure {}, closing the topic", str2, FutureUtil.getException(completableFuture3));
                                    BrokerService.this.executor().submit(() -> {
                                        systemTopic.close().whenComplete((r6, th3) -> {
                                            if (th3 != null) {
                                                BrokerService.log.warn("[{}] Get an error when closing topic.", str2, th3);
                                            }
                                        });
                                    });
                                } else {
                                    BrokerService.this.addTopicToStatsMaps(topicName2, systemTopic);
                                    completableFuture3.complete(Optional.of(systemTopic));
                                }
                            });
                            String str3 = str;
                            CompletableFuture completableFuture4 = completableFuture;
                            thenRun.exceptionally(th3 -> {
                                BrokerService.log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", str3, th3);
                                BrokerService.this.executor().submit(() -> {
                                    systemTopic.close().whenComplete((r8, th3) -> {
                                        if (th3 != null) {
                                            BrokerService.log.warn("[{}] Get an error when closing topic.", str3, th3);
                                        }
                                        completableFuture4.completeExceptionally(th3);
                                    });
                                });
                                return null;
                            });
                        } catch (PulsarServerException e) {
                            BrokerService.log.warn("Failed to create topic {}: {}", str, e.getMessage());
                            ScheduledExecutorService executor = BrokerService.this.pulsar.getExecutor();
                            String str4 = str;
                            CompletableFuture completableFuture5 = completableFuture;
                            executor.execute(() -> {
                                BrokerService.this.topics.remove(str4, completableFuture5);
                            });
                            completableFuture.completeExceptionally(e);
                        }
                    }

                    public void openLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        if (!z && (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerNotFoundException)) {
                            completableFuture2.completeExceptionally(managedLedgerException);
                            completableFuture.complete(Optional.empty());
                            return;
                        }
                        BrokerService.log.warn("Failed to create topic {}", str, managedLedgerException);
                        ScheduledExecutorService executor = BrokerService.this.pulsar.getExecutor();
                        String str2 = str;
                        CompletableFuture completableFuture3 = completableFuture;
                        executor.execute(() -> {
                            BrokerService.this.topics.remove(str2, completableFuture3);
                        });
                        completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable) managedLedgerException));
                    }
                }, () -> {
                    return isTopicNsOwnedByBrokerAsync(topicName);
                }, (Object) null);
            }).exceptionally(th2 -> {
                log.warn("[{}] Failed to get topic configuration: {}", new Object[]{str, th2.getMessage(), th2});
                this.pulsar.getExecutor().execute(() -> {
                    this.topics.remove(str, completableFuture);
                });
                completableFuture.completeExceptionally(th2);
                return null;
            });
        } else {
            String format = String.format("Can not create transaction system topic %s", str);
            log.warn(format);
            this.pulsar.getExecutor().execute(() -> {
                this.topics.remove(str, completableFuture);
            });
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException(format));
        }
    }

    private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName topicName) {
        if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AbstractTopic.isClusterMigrationEnabled(this.pulsar, topicName.toString()).handle((bool, th) -> {
            if (bool.booleanValue()) {
                completableFuture.completeExceptionally(new BrokerServiceException.TopicMigratedException(topicName + " already migrated"));
                return null;
            }
            completableFuture.complete(null);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName) {
        return getTopicPoliciesBypassSystemTopic(topicName).thenCompose(optional -> {
            return getManagedLedgerConfig(topicName, (TopicPolicies) optional.orElse(null));
        });
    }

    private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName, @Nullable TopicPolicies topicPolicies) {
        Objects.requireNonNull(topicName);
        NamespaceName namespaceObject = topicName.getNamespaceObject();
        ServiceConfiguration configuration = this.pulsar.getConfiguration();
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceObject).thenCombine((CompletionStage) this.pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(namespaceObject), (optional, optional2) -> {
            PersistencePolicies persistencePolicies = null;
            RetentionPolicies retentionPolicies = null;
            OffloadPoliciesImpl offloadPoliciesImpl = null;
            if (topicPolicies != null) {
                persistencePolicies = topicPolicies.getPersistence();
                retentionPolicies = topicPolicies.getRetentionPolicies();
                offloadPoliciesImpl = topicPolicies.getOffloadPolicies();
            }
            if (persistencePolicies == null) {
                persistencePolicies = (PersistencePolicies) optional.map(policies -> {
                    return policies.persistence;
                }).orElseGet(() -> {
                    return new PersistencePolicies(configuration.getManagedLedgerDefaultEnsembleSize(), configuration.getManagedLedgerDefaultWriteQuorum(), configuration.getManagedLedgerDefaultAckQuorum(), configuration.getManagedLedgerDefaultMarkDeleteRateLimit());
                });
            }
            if (retentionPolicies == null) {
                if (SystemTopicNames.isSystemTopic(topicName)) {
                    if (log.isDebugEnabled()) {
                        log.debug("{} Disable data retention policy for system topic.", topicName);
                    }
                    retentionPolicies = new RetentionPolicies(0, 0L);
                } else {
                    retentionPolicies = (RetentionPolicies) optional.map(policies2 -> {
                        return policies2.retention_policies;
                    }).orElseGet(() -> {
                        return new RetentionPolicies(configuration.getDefaultRetentionTimeInMinutes(), configuration.getDefaultRetentionSizeInMB());
                    });
                }
            }
            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
            if (configuration.isStrictBookieAffinityEnabled()) {
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(IsolatedBookieEnsemblePlacementPolicy.class);
                if (optional2.isPresent() && ((LocalPolicies) optional2.get()).bookieAffinityGroup != null) {
                    HashMap hashMap = new HashMap();
                    hashMap.put("isolationBookieGroups", ((LocalPolicies) optional2.get()).bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
                    hashMap.put("secondaryIsolationBookieGroups", ((LocalPolicies) optional2.get()).bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(hashMap);
                } else if (isSystemTopic(topicName)) {
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put("isolationBookieGroups", "*");
                    hashMap2.put("secondaryIsolationBookieGroups", "*");
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(hashMap2);
                } else {
                    HashMap hashMap3 = new HashMap();
                    hashMap3.put("isolationBookieGroups", "");
                    hashMap3.put("secondaryIsolationBookieGroups", "");
                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(hashMap3);
                }
            } else if (optional2.isPresent() && ((LocalPolicies) optional2.get()).bookieAffinityGroup != null) {
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(IsolatedBookieEnsemblePlacementPolicy.class);
                HashMap hashMap4 = new HashMap();
                hashMap4.put("isolationBookieGroups", ((LocalPolicies) optional2.get()).bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
                hashMap4.put("secondaryIsolationBookieGroups", ((LocalPolicies) optional2.get()).bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(hashMap4);
            }
            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
            managedLedgerConfig.setDigestType(configuration.getManagedLedgerDigestType());
            managedLedgerConfig.setPassword(configuration.getManagedLedgerPassword());
            managedLedgerConfig.setMaxUnackedRangesToPersist(configuration.getManagedLedgerMaxUnackedRangesToPersist());
            managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(configuration.isPersistentUnackedRangesWithMultipleEntriesEnabled());
            managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(configuration.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
            managedLedgerConfig.setMaxEntriesPerLedger(configuration.getManagedLedgerMaxEntriesPerLedger());
            managedLedgerConfig.setMinimumRolloverTime(configuration.getManagedLedgerMinLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaximumRolloverTime(configuration.getManagedLedgerMaxLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaxSizePerLedgerMb(configuration.getManagedLedgerMaxSizePerLedgerMbytes());
            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(configuration.getManagedLedgerMetadataOperationsTimeoutSeconds());
            managedLedgerConfig.setReadEntryTimeoutSeconds(configuration.getManagedLedgerReadEntryTimeoutSeconds());
            managedLedgerConfig.setAddEntryTimeoutSeconds(configuration.getManagedLedgerAddEntryTimeoutSeconds());
            managedLedgerConfig.setMetadataEnsembleSize(configuration.getManagedLedgerDefaultEnsembleSize());
            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(configuration.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
            managedLedgerConfig.setMetadataWriteQuorumSize(configuration.getManagedLedgerDefaultWriteQuorum());
            managedLedgerConfig.setMetadataAckQuorumSize(configuration.getManagedLedgerDefaultAckQuorum());
            managedLedgerConfig.setMetadataMaxEntriesPerLedger(configuration.getManagedLedgerCursorMaxEntriesPerLedger());
            managedLedgerConfig.setLedgerRolloverTimeout(configuration.getManagedLedgerCursorRolloverTimeInSeconds());
            managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
            managedLedgerConfig.setAutoSkipNonRecoverableData(configuration.isAutoSkipNonRecoverableData());
            managedLedgerConfig.setLazyCursorRecovery(configuration.isLazyCursorRecovery());
            managedLedgerConfig.setInactiveLedgerRollOverTime(configuration.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
            managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(configuration.isCacheEvictionByMarkDeletedPosition());
            managedLedgerConfig.setMinimumBacklogCursorsForCaching(configuration.getManagedLedgerMinimumBacklogCursorsForCaching());
            managedLedgerConfig.setMinimumBacklogEntriesForCaching(configuration.getManagedLedgerMinimumBacklogEntriesForCaching());
            managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(configuration.getManagedLedgerMaxBacklogBetweenCursorsForCaching());
            OffloadPoliciesImpl mergeConfiguration = OffloadPoliciesImpl.mergeConfiguration(offloadPoliciesImpl, OffloadPoliciesImpl.oldPoliciesCompatible((OffloadPoliciesImpl) optional.map(policies3 -> {
                return policies3.offload_policies;
            }).orElse(null), (Policies) optional.orElse(null)), getPulsar().getConfig().getProperties());
            if (NamespaceService.isSystemServiceNamespace(namespaceObject.toString()) || SystemTopicNames.isSystemTopic(topicName)) {
                managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
            } else if (offloadPoliciesImpl != null) {
                try {
                    managedLedgerConfig.setLedgerOffloader(pulsar().createManagedLedgerOffloader(mergeConfiguration));
                } catch (PulsarServerException e) {
                    throw new RuntimeException((Throwable) e);
                }
            } else {
                managedLedgerConfig.setLedgerOffloader(this.pulsar.getManagedLedgerOffloader(namespaceObject, mergeConfiguration));
            }
            managedLedgerConfig.setTriggerOffloadOnTopicLoad(configuration.isTriggerOffloadOnTopicLoad());
            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(configuration.isAcknowledgmentAtBatchIndexLevelEnabled());
            managedLedgerConfig.setNewEntriesCheckDelayInMillis(configuration.getManagedLedgerNewEntriesCheckDelayInMillis());
            return managedLedgerConfig;
        });
    }

    private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
        this.pulsar.getNamespaceService().getBundleAsync(topicName).thenAccept(namespaceBundle -> {
            if (namespaceBundle != null) {
                synchronized (this.multiLayerTopicsMap) {
                    ((ConcurrentOpenHashMap) ((ConcurrentOpenHashMap) this.multiLayerTopicsMap.computeIfAbsent(topicName.getNamespace(), str -> {
                        return ConcurrentOpenHashMap.newBuilder().build();
                    })).computeIfAbsent(namespaceBundle.toString(), str2 -> {
                        return ConcurrentOpenHashMap.newBuilder().build();
                    })).put(topicName.toString(), topic);
                }
            }
            invalidateOfflineTopicStatCache(topicName);
        }).exceptionally(th -> {
            log.warn("Got exception when retrieving bundle name during create persistent topic", th);
            return null;
        });
    }

    public void refreshTopicToStatsMaps(NamespaceBundle namespaceBundle) {
        Objects.requireNonNull(namespaceBundle);
        try {
            List<Topic> allTopicsFromNamespaceBundle = getAllTopicsFromNamespaceBundle(namespaceBundle.getNamespaceObject().toString(), namespaceBundle.toString());
            if (!CollectionUtils.isEmpty(allTopicsFromNamespaceBundle)) {
                allTopicsFromNamespaceBundle.stream().forEach(topic -> {
                    addTopicToStatsMaps(TopicName.get(topic.getName()), topic);
                });
                synchronized (this.multiLayerTopicsMap) {
                    ((ConcurrentOpenHashMap) this.multiLayerTopicsMap.get(namespaceBundle.getNamespaceObject().toString())).remove(namespaceBundle.toString());
                    this.pulsarStats.invalidBundleStats(namespaceBundle.toString());
                }
            }
        } catch (Exception e) {
            log.warn("Got exception while refreshing topicStats map", e);
        }
    }

    public PersistentOfflineTopicStats getOfflineTopicStat(TopicName topicName) {
        return (PersistentOfflineTopicStats) this.offlineTopicStatCache.get(topicName);
    }

    public void cacheOfflineTopicStats(TopicName topicName, PersistentOfflineTopicStats persistentOfflineTopicStats) {
        this.offlineTopicStatCache.put(topicName, persistentOfflineTopicStats);
    }

    public void invalidateOfflineTopicStatCache(TopicName topicName) {
        if (((PersistentOfflineTopicStats) this.offlineTopicStatCache.remove(topicName)) != null) {
            log.info("Removed cached offline topic stat for {} ", topicName.getPersistenceNamingEncoding());
        }
    }

    public Optional<Topic> getTopicReference(String str) {
        CompletableFuture completableFuture = (CompletableFuture) this.topics.get(str);
        return (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) ? Optional.empty() : (Optional) completableFuture.join();
    }

    public void updateRates() {
        synchronized (this.pulsarStats) {
            this.pulsarStats.updateStats(this.multiLayerTopicsMap);
            Summary.rotateLatencyCollection();
        }
    }

    public void getDimensionMetrics(java.util.function.Consumer<ByteBuf> consumer) {
        this.pulsarStats.getDimensionMetrics(consumer);
    }

    public List<Metrics> getTopicMetrics() {
        return this.pulsarStats.getTopicMetrics();
    }

    public Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsarStats.getBundleStats();
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public void checkGC() {
        forEachTopic((v0) -> {
            v0.checkGC();
        });
    }

    public void checkClusterMigration() {
        forEachTopic((v0) -> {
            v0.checkClusterMigration();
        });
    }

    public void checkMessageExpiry() {
        forEachTopic((v0) -> {
            v0.checkMessageExpiry();
        });
    }

    public void checkReplicationPolicies() {
        forEachTopic((v0) -> {
            v0.checkReplication();
        });
    }

    public void checkCompaction() {
        forEachTopic(topic -> {
            if (topic instanceof PersistentTopic) {
                ((PersistentTopic) topic).checkCompaction();
            }
        });
    }

    private void checkConsumedLedgers() {
        forEachTopic(topic -> {
            if (topic instanceof PersistentTopic) {
                Optional.ofNullable(((PersistentTopic) topic).getManagedLedger()).ifPresent(managedLedger -> {
                    managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
                });
            }
        });
    }

    public void checkMessageDeduplicationInfo() {
        forEachTopic((v0) -> {
            v0.checkMessageDeduplicationInfo();
        });
    }

    public void checkInactiveSubscriptions() {
        forEachTopic((v0) -> {
            v0.checkInactiveSubscriptions();
        });
    }

    public void forEachTopic(java.util.function.Consumer<Topic> consumer) {
        this.topics.forEach((str, completableFuture) -> {
            Optional<Topic> extractTopic = extractTopic(completableFuture);
            Objects.requireNonNull(consumer);
            extractTopic.ifPresent((v1) -> {
                r1.accept(v1);
            });
        });
    }

    public void forEachPersistentTopic(java.util.function.Consumer<PersistentTopic> consumer) {
        this.topics.values().stream().map(BrokerService::extractTopic).map(optional -> {
            return optional.filter(topic -> {
                return topic instanceof PersistentTopic;
            });
        }).forEach(optional2 -> {
            optional2.ifPresent(topic -> {
                consumer.accept((PersistentTopic) topic);
            });
        });
    }

    public BacklogQuotaManager getBacklogQuotaManager() {
        return this.backlogQuotaManager;
    }

    public void monitorBacklogQuota() {
        forEachPersistentTopic(persistentTopic -> {
            if (persistentTopic.isSizeBacklogExceeded()) {
                getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.destination_storage, false);
            } else {
                persistentTopic.checkTimeBacklogExceeded().thenAccept(bool -> {
                    if (bool.booleanValue()) {
                        getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.message_age, this.pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
                    } else if (log.isDebugEnabled()) {
                        log.debug("quota not exceeded for [{}]", persistentTopic.getName());
                    }
                }).exceptionally(th -> {
                    log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota", persistentTopic.getName(), th);
                    return null;
                });
            }
        });
    }

    public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicName) {
        return this.pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName).handle((bool, th) -> {
            if (th == null) {
                return bool;
            }
            log.warn("Failed to check the ownership of the topic: {}, {}", topicName, th.getMessage());
            return false;
        });
    }

    public CompletableFuture<Void> checkTopicNsOwnership(String str) {
        TopicName topicName = TopicName.get(str);
        return this.pulsar.getNamespaceService().checkTopicOwnership(topicName).thenCompose(bool -> {
            if (bool.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            String format = String.format("Namespace bundle for topic (%s) not served by this instance:%s. Please redo the lookup. Request is denied: namespace=%s", str, this.pulsar.getBrokerId(), topicName.getNamespace());
            log.warn(format);
            return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException(format));
        });
    }

    public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle namespaceBundle, boolean z, boolean z2, long j, TimeUnit timeUnit) {
        CompletableFuture<Integer> unloadServiceUnit = unloadServiceUnit(namespaceBundle, z, z2);
        ScheduledFuture schedule = executor().schedule(() -> {
            if (unloadServiceUnit.isDone()) {
                return;
            }
            log.warn("Unloading of {} has timed out", namespaceBundle);
            unloadServiceUnit.complete(0);
        }, j, timeUnit);
        unloadServiceUnit.whenComplete((num, th) -> {
            schedule.cancel(true);
        });
        return unloadServiceUnit;
    }

    private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle namespaceBundle, boolean z, boolean z2) {
        ArrayList arrayList = new ArrayList();
        this.topics.forEach((str, completableFuture) -> {
            TopicName topicName = TopicName.get(str);
            if (namespaceBundle.includes(topicName)) {
                log.info("[{}] Unloading topic", topicName);
                if (completableFuture.isCompletedExceptionally()) {
                    try {
                        completableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Topic was already unloaded", topicName);
                                return;
                            }
                            return;
                        }
                        log.warn("[{}] Got exception when closing topic", topicName, e);
                    }
                }
                arrayList.add(completableFuture.thenCompose(optional -> {
                    return optional.isPresent() ? ((Topic) optional.get()).close(z, z2) : CompletableFuture.completedFuture(null);
                }).exceptionally(th -> {
                    if (!(th.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException) || !th.getMessage().contains("Please redo the lookup")) {
                        throw FutureUtil.wrapToCompletionException(th);
                    }
                    log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
                    return null;
                }));
            }
        });
        if (getPulsar().getConfig().isTransactionCoordinatorEnabled() && namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
            TransactionMetadataStoreService transactionMetadataStoreService = getPulsar().getTransactionMetadataStoreService();
            getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(transactionMetadataStore -> {
                return namespaceBundle.includes(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) transactionMetadataStore.getTransactionCoordinatorID().getId()));
            }).map((v0) -> {
                return v0.getTransactionCoordinatorID();
            }).forEach(transactionCoordinatorID -> {
                arrayList.add(transactionMetadataStoreService.removeTransactionMetadataStore(transactionCoordinatorID));
            });
        }
        return FutureUtil.waitForAll(arrayList).thenApply(r3 -> {
            return Integer.valueOf(arrayList.size());
        });
    }

    public void cleanUnloadedTopicFromCache(NamespaceBundle namespaceBundle) {
        for (String str : this.topics.keys()) {
            TopicName topicName = TopicName.get(str);
            if (namespaceBundle.includes(topicName) && getTopicReference(str).isPresent()) {
                log.info("[{}][{}] Clean unloaded topic from cache.", namespaceBundle.toString(), str);
                this.pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), namespaceBundle, null);
            }
        }
    }

    public AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
        Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache = findTopicFutureInCache(topic);
        return findTopicFutureInCache.isEmpty() ? CompletableFuture.completedFuture(null) : removeTopicFutureFromCache(topic.getName(), findTopicFutureInCache.get());
    }

    private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic) {
        CompletableFuture completableFuture;
        if (topic != null && (completableFuture = (CompletableFuture) this.topics.get(topic.getName())) != null && completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
            Topic topic2 = (Topic) ((Optional) completableFuture.join()).orElse(null);
            return (topic2 == null || topic2 != topic) ? Optional.empty() : Optional.of(completableFuture);
        }
        return Optional.empty();
    }

    private CompletableFuture<Void> removeTopicFutureFromCache(String str, CompletableFuture<Optional<Topic>> completableFuture) {
        return this.pulsar.getNamespaceService().getBundleAsync(TopicName.get(str)).thenAccept(namespaceBundle -> {
            removeTopicFromCache(str, namespaceBundle, completableFuture);
        });
    }

    private void removeTopicFromCache(String str, NamespaceBundle namespaceBundle, CompletableFuture<Optional<Topic>> completableFuture) {
        String namespaceBundle2 = namespaceBundle.toString();
        String namespaceName = TopicName.get(str).getNamespaceObject().toString();
        this.topicEventsDispatcher.notify(str, TopicEventsListener.TopicEvent.UNLOAD, TopicEventsListener.EventStage.BEFORE);
        synchronized (this.multiLayerTopicsMap) {
            ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) this.multiLayerTopicsMap.get(namespaceName);
            if (concurrentOpenHashMap != null) {
                ConcurrentOpenHashMap concurrentOpenHashMap2 = (ConcurrentOpenHashMap) concurrentOpenHashMap.get(namespaceBundle2);
                if (concurrentOpenHashMap2 != null) {
                    concurrentOpenHashMap2.remove(str);
                    if (concurrentOpenHashMap2.isEmpty()) {
                        concurrentOpenHashMap.remove(namespaceBundle2);
                    }
                }
                if (concurrentOpenHashMap.isEmpty()) {
                    this.multiLayerTopicsMap.remove(namespaceName);
                    ClusterReplicationMetrics clusterReplicationMetrics = this.pulsarStats.getClusterReplicationMetrics();
                    this.replicationClients.forEach((str2, pulsarClient) -> {
                        clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, str2));
                    });
                }
            }
        }
        if (completableFuture == null) {
            this.topics.remove(str);
        } else {
            this.topics.remove(str, completableFuture);
        }
        Compactor nullableCompactor = this.pulsar.getNullableCompactor();
        if (nullableCompactor != null) {
            nullableCompactor.getStats().removeTopic(str);
        }
        this.topicEventsDispatcher.notify(str, TopicEventsListener.TopicEvent.UNLOAD, TopicEventsListener.EventStage.SUCCESS);
    }

    public int getNumberOfNamespaceBundles() {
        this.numberOfNamespaceBundles = 0;
        this.multiLayerTopicsMap.forEach((str, concurrentOpenHashMap) -> {
            this.numberOfNamespaceBundles = (int) (this.numberOfNamespaceBundles + concurrentOpenHashMap.size());
        });
        return this.numberOfNamespaceBundles;
    }

    public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> getTopics() {
        return this.topics;
    }

    private void handleMetadataChanges(Notification notification) {
        if (notification.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(notification.getPath())) {
            handlePoliciesUpdates(NamespaceResources.namespaceFromPath(notification.getPath()));
            return;
        }
        if (notification.getType() == NotificationType.Modified && NamespaceResources.pathIsNamespaceLocalPolicies(notification.getPath())) {
            handleLocalPoliciesUpdates(NamespaceResources.namespaceFromLocalPoliciesPath(notification.getPath()));
        } else if (pulsar().getPulsarResources().getDynamicConfigResources().isDynamicConfigurationPath(notification.getPath())) {
            handleDynamicConfigurationUpdates();
        }
    }

    private void handleLocalPoliciesUpdates(NamespaceName namespaceName) {
        this.pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(namespaceName).thenAcceptAsync(optional -> {
            if (optional.isPresent()) {
                log.info("[{}] updating with {}", namespaceName, (LocalPolicies) optional.get());
                this.topics.forEach((str, completableFuture) -> {
                    if (namespaceName.includes(TopicName.get(str))) {
                        completableFuture.thenAccept(optional -> {
                            if (log.isDebugEnabled()) {
                                log.debug("Notifying topic that local policies have changed: {}", str);
                            }
                            optional.ifPresent(topic -> {
                                if (topic instanceof PersistentTopic) {
                                    ((PersistentTopic) topic).onLocalPoliciesUpdate();
                                }
                            });
                        });
                    }
                });
            }
        }, (Executor) this.pulsar.getExecutor());
    }

    private void handlePoliciesUpdates(NamespaceName namespaceName) {
        this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAcceptAsync(optional -> {
            if (optional.isPresent()) {
                Policies policies = (Policies) optional.get();
                log.info("[{}] updating with {}", namespaceName, policies);
                this.topics.forEach((str, completableFuture) -> {
                    if (namespaceName.includes(TopicName.get(str))) {
                        completableFuture.thenAccept(optional -> {
                            if (log.isDebugEnabled()) {
                                log.debug("Notifying topic that policies have changed: {}", str);
                            }
                            optional.ifPresent(topic -> {
                                topic.onPoliciesUpdate(policies);
                            });
                        });
                    }
                });
                unloadDeletedReplNamespace(policies, namespaceName);
            }
        }, (Executor) this.pulsar.getExecutor());
    }

    private void handleDynamicConfigurationUpdates() {
        DynamicConfigurationResources dynamicConfigurationResources = null;
        try {
            dynamicConfigurationResources = pulsar().getPulsarResources().getDynamicConfigResources();
        } catch (Exception e) {
            log.warn("Failed to read dynamic broker configuration", e);
        }
        if (dynamicConfigurationResources != null) {
            dynamicConfigurationResources.getDynamicConfigurationAsync().thenAccept(optional -> {
                this.dynamicConfigurationMap.forEach((str, configField) -> {
                    boolean z = optional.isEmpty() || !((Map) optional.get()).containsKey(str);
                    if (configField.lastDynamicValue == null || !z) {
                        return;
                    }
                    configValueChanged(str, null);
                });
                if (optional.isPresent()) {
                    ((Map) optional.get()).forEach((str2, str3) -> {
                        configValueChanged(str2, str3);
                    });
                }
            });
        }
    }

    private void configValueChanged(String str, String str2) {
        Object obj;
        Object obj2;
        ConfigField configField = (ConfigField) this.dynamicConfigurationMap.get(str);
        if (configField == null) {
            log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", str);
            return;
        }
        java.util.function.Consumer consumer = (java.util.function.Consumer) this.configRegisteredListeners.get(str);
        try {
            if (configField.field != null) {
                obj2 = StringUtils.isBlank(str2) ? configField.defaultValue : FieldParser.value(str2, configField.field);
                obj = configField.field.get(this.pulsar.getConfiguration());
                configField.field.set(this.pulsar.getConfiguration(), obj2);
            } else {
                log.info("Skip update customized dynamic configuration {}/{} in memory, only trigger an event listeners.", str, str2);
                obj = configField.lastDynamicValue;
                obj2 = str2 == null ? configField.defaultValue : str2;
            }
            configField.lastDynamicValue = str2;
            if (str2 == null) {
                log.info("Successfully remove the dynamic configuration {}, and revert to the default value", str);
            } else {
                log.info("Successfully updated configuration {}/{}", str, str2);
            }
            if (consumer != null && !Objects.equals(obj, obj2)) {
                consumer.accept(obj2);
            }
        } catch (Exception e) {
            log.error("Failed to update config {}", str, e);
        }
    }

    private void unloadDeletedReplNamespace(Policies policies, NamespaceName namespaceName) {
        if (namespaceName.isGlobal()) {
            String clusterName = this.pulsar.getConfiguration().getClusterName();
            if (policies.replication_clusters.contains(clusterName)) {
                return;
            }
            pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(namespaceName).thenAccept(namespaceBundles -> {
                namespaceBundles.getBundles().forEach(namespaceBundle -> {
                    this.pulsar.getNamespaceService().isNamespaceBundleOwned(namespaceBundle).thenAccept(bool -> {
                        if (bool.booleanValue()) {
                            pulsar().getExecutor().execute(() -> {
                                try {
                                    pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespaceName.toString(), namespaceBundle.getBundleRange());
                                } catch (Exception e) {
                                    log.error("Failed to unload namespace-bundle {}-{} that not owned by {}, {}", new Object[]{namespaceName.toString(), namespaceBundle.toString(), clusterName, e.getMessage()});
                                }
                            });
                        }
                    });
                });
            });
        }
    }

    public PulsarService pulsar() {
        return this.pulsar;
    }

    public EventLoopGroup executor() {
        return this.workerGroup;
    }

    public ConcurrentOpenHashMap<String, PulsarClient> getReplicationClients() {
        return this.replicationClients;
    }

    public boolean isAuthenticationEnabled() {
        return this.pulsar.getConfiguration().isAuthenticationEnabled();
    }

    public boolean isAuthorizationEnabled() {
        return this.pulsar.getConfiguration().isAuthorizationEnabled();
    }

    public int getKeepAliveIntervalSeconds() {
        return this.keepAliveIntervalSeconds;
    }

    public String generateUniqueProducerName() {
        return this.producerNameGenerator.getNextId();
    }

    public Map<String, TopicStatsImpl> getTopicStats() {
        HashMap hashMap = new HashMap();
        forEachTopic(topic -> {
            hashMap.put(topic.getName(), topic.mo308getStats(false, false, false));
        });
        return hashMap;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public List<Topic> getAllTopicsFromNamespaceBundle(String str, String str2) {
        ConcurrentOpenHashMap concurrentOpenHashMap;
        ConcurrentOpenHashMap concurrentOpenHashMap2 = (ConcurrentOpenHashMap) this.multiLayerTopicsMap.get(str);
        if (concurrentOpenHashMap2 != null && (concurrentOpenHashMap = (ConcurrentOpenHashMap) concurrentOpenHashMap2.get(str2)) != null) {
            return concurrentOpenHashMap.values();
        }
        return Collections.emptyList();
    }

    private void updateConfigurationAndRegisterListeners() {
        addDynamicConfigValidator("loadManagerClassName", str -> {
            try {
                Class.forName(str);
                return true;
            } catch (ClassNotFoundException | NoClassDefFoundError e) {
                log.warn("Configured load-manager class {} not found {}", str, e.getMessage());
                return false;
            }
        });
        registerConfigurationListener("maxConcurrentLookupRequest", obj -> {
            this.lookupRequestSemaphore.set(new Semaphore(((Integer) obj).intValue(), false));
        });
        registerConfigurationListener("maxConcurrentTopicLoadRequest", obj2 -> {
            this.topicLoadRequestSemaphore.set(new Semaphore(((Integer) obj2).intValue(), false));
        });
        registerConfigurationListener("loadManagerClassName", obj3 -> {
            this.pulsar.getExecutor().execute(() -> {
                LoadManager loadManager = null;
                try {
                    loadManager = LoadManager.create(this.pulsar);
                    log.info("Created load manager: {}", obj3);
                    this.pulsar.getLoadManager().get().stop();
                    loadManager.start();
                } catch (Exception e) {
                    log.warn("Failed to change load manager", e);
                    if (loadManager != null) {
                        try {
                            loadManager.stop();
                            loadManager = null;
                        } catch (PulsarServerException e2) {
                            log.warn("Failed to close created load manager", e2);
                        }
                    }
                }
                if (loadManager != null) {
                    this.pulsar.getLoadManager().set(loadManager);
                }
            });
        });
        registerConfigurationListener("managedLedgerCacheSizeMB", obj4 -> {
            this.managedLedgerFactory.getEntryCacheManager().updateCacheSizeAndThreshold(((Integer) obj4).intValue() * 1024 * 1024);
        });
        registerConfigurationListener("managedLedgerCacheEvictionWatermark", obj5 -> {
            this.managedLedgerFactory.getEntryCacheManager().updateCacheEvictionWatermark(((Double) obj5).doubleValue());
        });
        registerConfigurationListener("managedLedgerCacheEvictionTimeThresholdMillis", obj6 -> {
            this.managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS.toNanos(((Long) obj6).longValue()));
        });
        registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", obj7 -> {
            updateTopicMessageDispatchRate();
        });
        registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", obj8 -> {
            updateTopicMessageDispatchRate();
        });
        registerConfigurationListener("autoSkipNonRecoverableData", obj9 -> {
            updateManagedLedgerConfig();
        });
        registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", obj10 -> {
            updateSubscriptionMessageDispatchRate();
        });
        registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", obj11 -> {
            updateSubscriptionMessageDispatchRate();
        });
        registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", obj12 -> {
            updateReplicatorMessageDispatchRate();
        });
        registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", obj13 -> {
            updateReplicatorMessageDispatchRate();
        });
        registerConfigurationListener("maxPublishRatePerTopicInMessages", obj14 -> {
            updateMaxPublishRatePerTopicInMessages();
        });
        registerConfigurationListener("maxPublishRatePerTopicInBytes", obj15 -> {
            updateMaxPublishRatePerTopicInMessages();
        });
        registerConfigurationListener("subscribeThrottlingRatePerConsumer", obj16 -> {
            updateSubscribeRate();
        });
        registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond", obj17 -> {
            updateSubscribeRate();
        });
        registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate", obj18 -> {
            updateBrokerPublisherThrottlingMaxRate();
        });
        registerConfigurationListener("brokerPublisherThrottlingMaxByteRate", obj19 -> {
            updateBrokerPublisherThrottlingMaxRate();
        });
        registerConfigurationListener("dispatchThrottlingRateInMsg", obj20 -> {
            updateBrokerDispatchThrottlingMaxRate();
        });
        registerConfigurationListener("dispatchThrottlingRateInByte", obj21 -> {
            updateBrokerDispatchThrottlingMaxRate();
        });
        registerConfigurationListener("subscriptionTypesEnabled", this::updateBrokerSubscriptionTypesEnabled);
        registerConfigurationListener("defaultNumPartitions", obj22 -> {
            updateDefaultNumPartitions(((Integer) obj22).intValue());
        });
        registerConfigurationListener("maxNumPartitionsPerPartitionedTopic", obj23 -> {
            updateMaxNumPartitionsPerPartitionedTopic(((Integer) obj23).intValue());
        });
        registerConfigurationListener("httpRequestsFailOnUnknownPropertiesEnabled", obj24 -> {
            this.pulsar.getWebService().updateHttpRequestsFailOnUnknownPropertiesEnabled(((Boolean) obj24).booleanValue());
        });
        createDynamicConfigPathIfNotExist();
        handleDynamicConfigurationUpdates();
    }

    private void updateDefaultNumPartitions(int i) {
        int maxNumPartitionsPerPartitionedTopic = this.pulsar.getConfiguration().getMaxNumPartitionsPerPartitionedTopic();
        if (maxNumPartitionsPerPartitionedTopic == 0 || maxNumPartitionsPerPartitionedTopic > i) {
            this.pulsar.getConfiguration().setDefaultNumPartitions(i);
        } else {
            this.pulsar.getConfiguration().setDefaultNumPartitions(maxNumPartitionsPerPartitionedTopic);
        }
    }

    private void updateMaxNumPartitionsPerPartitionedTopic(int i) {
        if (i == 0) {
            this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(i);
            return;
        }
        if (this.pulsar.getConfiguration().getDefaultNumPartitions() > i) {
            this.pulsar.getConfiguration().setDefaultNumPartitions(i);
        }
        this.pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(i);
    }

    private void updateBrokerDispatchThrottlingMaxRate() {
        if (this.brokerDispatchRateLimiter == null) {
            this.brokerDispatchRateLimiter = new DispatchRateLimiter(this);
        } else {
            this.brokerDispatchRateLimiter.updateDispatchRate();
        }
    }

    private void updateMaxPublishRatePerTopicInMessages() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof AbstractTopic) {
                    ((AbstractTopic) topic).updateBrokerPublishRate();
                    ((AbstractTopic) topic).updatePublishDispatcher();
                }
            });
        });
    }

    private void updateSubscribeRate() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof PersistentTopic) {
                    ((PersistentTopic) topic).updateBrokerSubscribeRate();
                    ((PersistentTopic) topic).updateSubscribeRateLimiter();
                }
            });
        });
    }

    private void updateBrokerPublisherThrottlingMaxRate() {
        PublishRate publishRate = new PublishRate(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(), this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate());
        log.info("Update broker publish rate limiting {}", publishRate);
        this.brokerPublishRateLimiter.update(publishRate);
    }

    private void updateTopicMessageDispatchRate() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof AbstractTopic) {
                    ((AbstractTopic) topic).updateBrokerDispatchRate();
                    ((AbstractTopic) topic).updateDispatchRateLimiter();
                }
            });
        });
    }

    private void updateBrokerSubscriptionTypesEnabled(Object obj) {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof AbstractTopic) {
                    ((AbstractTopic) topic).updateBrokerSubscriptionTypesEnabled();
                }
            });
        });
    }

    private void updateSubscriptionMessageDispatchRate() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof AbstractTopic) {
                    ((AbstractTopic) topic).updateBrokerSubscriptionDispatchRate();
                }
                topic.getSubscriptions().forEach((str, subscription) -> {
                    Dispatcher dispatcher = subscription.getDispatcher();
                    if (dispatcher != null) {
                        dispatcher.updateRateLimiter();
                    }
                });
            });
        });
    }

    private void updateReplicatorMessageDispatchRate() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                if (topic instanceof AbstractTopic) {
                    ((AbstractTopic) topic).updateBrokerReplicatorDispatchRate();
                }
                topic.getReplicators().forEach((str, replicator) -> {
                    replicator.updateRateLimiter();
                });
                topic.getShadowReplicators().forEach((str2, replicator2) -> {
                    replicator2.updateRateLimiter();
                });
            });
        });
    }

    private void updateManagedLedgerConfig() {
        pulsar().getExecutor().execute(() -> {
            forEachTopic(topic -> {
                try {
                    if (topic instanceof PersistentTopic) {
                        ((PersistentTopic) topic).getManagedLedger().getConfig().setAutoSkipNonRecoverableData(this.pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                    }
                } catch (Exception e) {
                    log.warn("[{}] failed to update managed-ledger config", topic.getName(), e);
                }
            });
        });
    }

    public <T> void registerConfigurationListener(String str, java.util.function.Consumer<T> consumer) {
        validateConfigKey(str);
        this.configRegisteredListeners.put(str, consumer);
    }

    private void addDynamicConfigValidator(String str, Predicate<String> predicate) {
        validateConfigKey(str);
        ((ConfigField) this.dynamicConfigurationMap.get(str)).validator = predicate;
    }

    private void validateConfigKey(String str) {
        if (!this.dynamicConfigurationMap.containsKey(str)) {
            throw new IllegalArgumentException(str + " doesn't exits in the dynamicConfigurationMap");
        }
    }

    public void registerCustomDynamicConfiguration(String str, Predicate<String> predicate) {
        if (this.dynamicConfigurationMap.containsKey(str)) {
            throw new IllegalArgumentException(str + " already exists in the dynamicConfigurationMap");
        }
        ConfigField newCustomConfigField = ConfigField.newCustomConfigField(null);
        newCustomConfigField.validator = predicate;
        this.dynamicConfigurationMap.put(str, newCustomConfigField);
    }

    private void createDynamicConfigPathIfNotExist() {
        try {
            if (!pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration().isPresent()) {
                pulsar().getPulsarResources().getDynamicConfigResources().setDynamicConfigurationWithCreate(optional -> {
                    return new HashMap();
                });
            }
        } catch (Exception e) {
            log.warn("Failed to read dynamic broker configuration", e);
        }
    }

    private void updateDynamicServiceConfiguration() {
        Optional empty = Optional.empty();
        try {
            empty = pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration();
            if (!empty.isPresent()) {
                pulsar().getPulsarResources().getDynamicConfigResources().setDynamicConfigurationWithCreate(optional -> {
                    return new HashMap();
                });
            }
        } catch (Exception e) {
            log.warn("Failed to read dynamic broker configuration", e);
        }
        empty.ifPresent(map -> {
            map.forEach((str, str2) -> {
                if (this.dynamicConfigurationMap.containsKey(str) && ((ConfigField) this.dynamicConfigurationMap.get(str)).validator != null && !((ConfigField) this.dynamicConfigurationMap.get(str)).validator.test(str2)) {
                    log.error("Failed to validate dynamic config {} with value {}", str, str2);
                    throw new IllegalArgumentException(String.format("Failed to validate dynamic-config %s/%s", str, str2));
                }
                try {
                    Field declaredField = ServiceConfiguration.class.getDeclaredField(str);
                    if (declaredField != null && declaredField.isAnnotationPresent(FieldContext.class)) {
                        declaredField.setAccessible(true);
                        declaredField.set(pulsar().getConfiguration(), FieldParser.value(str2, declaredField));
                        log.info("Successfully updated {}/{}", str, str2);
                    }
                } catch (Exception e2) {
                    log.warn("Failed to update service configuration {}/{}, {}", new Object[]{str, str2, e2.getMessage()});
                }
            });
        });
    }

    public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() {
        return this.delayedDeliveryTrackerFactory;
    }

    public List<String> getDynamicConfiguration() {
        return this.dynamicConfigurationMap.keys();
    }

    public Map<String, String> getRuntimeConfiguration() {
        HashMap hashMap = new HashMap();
        getRuntimeConfigurationMap().forEach((str, obj) -> {
            hashMap.put(str, String.valueOf(obj));
        });
        return hashMap;
    }

    public boolean isDynamicConfiguration(String str) {
        return this.dynamicConfigurationMap.containsKey(str);
    }

    public boolean validateDynamicConfiguration(String str, String str2) {
        if (!this.dynamicConfigurationMap.containsKey(str) || ((ConfigField) this.dynamicConfigurationMap.get(str)).validator == null) {
            return true;
        }
        return ((ConfigField) this.dynamicConfigurationMap.get(str)).validator.test(str2);
    }

    private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
        ConcurrentOpenHashMap<String, ConfigField> build = ConcurrentOpenHashMap.newBuilder().build();
        try {
            for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
                if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                    field.setAccessible(true);
                    if (field.getAnnotation(FieldContext.class).dynamic()) {
                        build.put(field.getName(), new ConfigField(field, field.get(this.pulsar.getConfiguration())));
                    }
                }
            }
            return build;
        } catch (IllegalAccessException | IllegalArgumentException e) {
            log.error("Failed to initialize dynamic configuration map", e);
            throw new RuntimeException(e);
        }
    }

    private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
        ConcurrentOpenHashMap<String, Object> build = ConcurrentOpenHashMap.newBuilder().build();
        for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
            if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                field.setAccessible(true);
                try {
                    Object obj = field.get(this.pulsar.getConfiguration());
                    build.put(field.getName(), obj == null ? "" : obj);
                } catch (Exception e) {
                    log.error("Failed to get value of field {}, {}", field.getName(), e.getMessage());
                }
            }
        }
        return build;
    }

    private void createPendingLoadTopic() {
        TopicLoadingContext poll = this.pendingTopicLoadingQueue.poll();
        if (poll == null) {
            return;
        }
        String topic = poll.getTopic();
        checkTopicNsOwnership(topic).thenRun(() -> {
            CompletableFuture<Optional<Topic>> topicFuture = poll.getTopicFuture();
            Semaphore semaphore = this.topicLoadRequestSemaphore.get();
            boolean tryAcquire = semaphore.tryAcquire();
            checkOwnershipAndCreatePersistentTopic(topic, poll.isCreateIfMissing(), topicFuture, poll.getProperties(), poll.getTopicPolicies());
            topicFuture.handle((optional, th) -> {
                if (tryAcquire) {
                    semaphore.release();
                }
                createPendingLoadTopic();
                return null;
            });
        }).exceptionally(th -> {
            log.error("Failed to create pending topic {}", topic, th);
            poll.getTopicFuture().completeExceptionally((!(th instanceof RuntimeException) || th.getCause() == null) ? th : th.getCause());
            this.inactivityMonitor.schedule(this::createPendingLoadTopic, 100L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) {
        return this.pulsar.getNamespaceService() == null ? FutureUtil.failedFuture(new BrokerServiceException.NamingException("namespace service is not ready")) : this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenCompose(optional -> {
            return this.pulsar.getNamespaceService().checkTopicExists(topicName).thenCompose(bool -> {
                return fetchPartitionedTopicMetadataAsync(topicName).thenCompose(partitionedTopicMetadata -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    this.pulsar.getExecutor().execute(() -> {
                        if (partitionedTopicMetadata.partitions != 0 || bool.booleanValue() || topicName.isPartitioned() || !this.pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName, optional)) {
                            completableFuture.complete(partitionedTopicMetadata);
                        } else {
                            isAllowAutoTopicCreationAsync(topicName, optional).thenAccept(bool -> {
                                if (bool.booleanValue()) {
                                    this.pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName, optional).thenAccept(partitionedTopicMetadata -> {
                                        completableFuture.complete(partitionedTopicMetadata);
                                    }).exceptionally(th -> {
                                        if (th.getCause() instanceof MetadataStoreException.AlreadyExistsException) {
                                            log.info("[{}] The partitioned topic is already created, try to refresh the cache and read again.", topicName);
                                            fetchPartitionedTopicMetadataAsync(topicName, true).whenComplete((partitionedTopicMetadata2, th) -> {
                                                if (th == null) {
                                                    completableFuture.complete(partitionedTopicMetadata2);
                                                } else {
                                                    completableFuture.completeExceptionally(th);
                                                }
                                            });
                                            return null;
                                        }
                                        log.error("[{}] operation of creating partitioned topic metadata failed", topicName, th);
                                        completableFuture.completeExceptionally(th);
                                        return null;
                                    });
                                } else {
                                    completableFuture.complete(partitionedTopicMetadata);
                                }
                            }).exceptionally(th -> {
                                completableFuture.completeExceptionally(th);
                                return null;
                            });
                        }
                    });
                    return completableFuture;
                });
            });
        });
    }

    private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName, Optional<Policies> optional) {
        int defaultNumPartitions = this.pulsar.getBrokerService().getDefaultNumPartitions(topicName, optional);
        int maxNumPartitionsPerPartitionedTopic = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
        Preconditions.checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
        Preconditions.checkArgument(maxNumPartitionsPerPartitionedTopic <= 0 || defaultNumPartitions <= maxNumPartitionsPerPartitionedTopic, "Number of partitions should be less than or equal to " + maxNumPartitionsPerPartitionedTopic);
        PartitionedTopicMetadata partitionedTopicMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
        return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions).thenCompose(r7 -> {
            return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().createPartitionedTopicAsync(topicName, partitionedTopicMetadata).thenApply(r6 -> {
                log.info("partitioned metadata successfully created for {}", topicName);
                return partitionedTopicMetadata;
            });
        });
    }

    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
        return fetchPartitionedTopicMetadataAsync(topicName, false);
    }

    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName, boolean z) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName, z).thenApply(optional -> {
            return (PartitionedTopicMetadata) optional.orElseGet(() -> {
                return new PartitionedTopicMetadata();
            });
        });
    }

    public OrderedExecutor getTopicOrderedExecutor() {
        return this.topicOrderedExecutor;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicMap() {
        return this.multiLayerTopicsMap;
    }

    public void addUnAckedMessages(PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers, int i) {
        if (this.maxUnackedMessages > 0) {
            this.totalUnackedMessages.add(i);
            if (!this.blockedDispatcherOnHighUnackedMsgs.get() || persistentDispatcherMultipleConsumers.isBlockedDispatcherOnUnackedMsgs() || persistentDispatcherMultipleConsumers.getTotalUnackedMessages() <= this.maxUnackedMsgsPerDispatcher) {
                return;
            }
            this.lock.readLock().lock();
            try {
                log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}", persistentDispatcherMultipleConsumers.getName(), Integer.valueOf(persistentDispatcherMultipleConsumers.getTotalUnackedMessages()));
                persistentDispatcherMultipleConsumers.blockDispatcherOnUnackedMsgs();
                this.blockedDispatchers.add(persistentDispatcherMultipleConsumers);
            } finally {
                this.lock.readLock().unlock();
            }
        }
    }

    public void checkUnAckMessageDispatching() {
        if (this.maxUnackedMessages <= 0) {
            return;
        }
        long sum = this.totalUnackedMessages.sum();
        if (sum >= this.maxUnackedMessages && this.blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) {
            log.info("Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", Integer.valueOf(this.maxUnackedMessages), Integer.valueOf(this.maxUnackedMsgsPerDispatcher));
            executor().execute(() -> {
                blockDispatchersWithLargeUnAckMessages();
            });
        } else if (this.blockedDispatcherOnHighUnackedMsgs.get() && sum < this.maxUnackedMessages / 2 && this.blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
            unblockDispatchersOnUnAckMessages(this.blockedDispatchers.values());
        }
    }

    public boolean isBrokerDispatchingBlocked() {
        return this.blockedDispatcherOnHighUnackedMsgs.get();
    }

    private void blockDispatchersWithLargeUnAckMessages() {
        this.lock.readLock().lock();
        try {
            forEachTopic(topic -> {
                topic.getSubscriptions().forEach((str, subscription) -> {
                    if (subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                        PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = (PersistentDispatcherMultipleConsumers) subscription.getDispatcher();
                        if (persistentDispatcherMultipleConsumers.getTotalUnackedMessages() > this.maxUnackedMsgsPerDispatcher) {
                            log.info("[{}] Blocking dispatcher due to reached max broker limit {}", persistentDispatcherMultipleConsumers.getName(), Integer.valueOf(persistentDispatcherMultipleConsumers.getTotalUnackedMessages()));
                            persistentDispatcherMultipleConsumers.blockDispatcherOnUnackedMsgs();
                            this.blockedDispatchers.add(persistentDispatcherMultipleConsumers);
                        }
                    }
                });
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> list) {
        this.lock.writeLock().lock();
        try {
            list.forEach(persistentDispatcherMultipleConsumers -> {
                persistentDispatcherMultipleConsumers.unBlockDispatcherOnUnackedMsgs();
                executor().execute(() -> {
                    persistentDispatcherMultipleConsumers.readMoreEntries();
                });
                log.info("[{}] Dispatcher is unblocked", persistentDispatcherMultipleConsumers.getName());
                this.blockedDispatchers.remove(persistentDispatcherMultipleConsumers);
            });
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public static Optional<Topic> extractTopic(CompletableFuture<Optional<Topic>> completableFuture) {
        return (!completableFuture.isDone() || completableFuture.isCompletedExceptionally()) ? Optional.empty() : completableFuture.join();
    }

    public Optional<Integer> getListenPort() {
        return this.listenChannel != null ? Optional.of(Integer.valueOf(((InetSocketAddress) this.listenChannel.localAddress()).getPort())) : Optional.empty();
    }

    public Optional<Integer> getListenPortTls() {
        return this.listenChannelTls != null ? Optional.of(Integer.valueOf(((InetSocketAddress) this.listenChannelTls.localAddress()).getPort())) : Optional.empty();
    }

    public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(String str) {
        return isAllowAutoTopicCreationAsync(TopicName.get(str));
    }

    public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(TopicName topicName) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenCompose(optional -> {
            return isAllowAutoTopicCreationAsync(topicName, optional);
        });
    }

    private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(TopicName topicName, Optional<Policies> optional) {
        if (optional.isPresent() && optional.get().deleted) {
            log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", topicName.getNamespaceObject());
            return CompletableFuture.completedFuture(false);
        }
        if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
            return CompletableFuture.completedFuture(false);
        }
        if (this.pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
            return CompletableFuture.completedFuture(true);
        }
        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, optional);
        boolean isAllowAutoTopicCreation = autoTopicCreationOverride != null ? autoTopicCreationOverride.isAllowAutoTopicCreation() : this.pulsar.getConfiguration().isAllowAutoTopicCreation();
        return (isAllowAutoTopicCreation && topicName.isPartitioned()) ? this.pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().isPartitionedTopicBeingDeletedAsync(topicName).thenApply(bool -> {
            return Boolean.valueOf(!bool.booleanValue());
        }) : CompletableFuture.completedFuture(Boolean.valueOf(isAllowAutoTopicCreation));
    }

    public boolean isDefaultTopicTypePartitioned(TopicName topicName, Optional<Policies> optional) {
        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, optional);
        return autoTopicCreationOverride != null ? TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType()) : this.pulsar.getConfiguration().isDefaultTopicTypePartitioned();
    }

    public int getDefaultNumPartitions(TopicName topicName, Optional<Policies> optional) {
        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, optional);
        return autoTopicCreationOverride != null ? autoTopicCreationOverride.getDefaultNumPartitions().intValue() : this.pulsar.getConfiguration().getDefaultNumPartitions();
    }

    private AutoTopicCreationOverride getAutoTopicCreationOverride(TopicName topicName, Optional<Policies> optional) {
        if (optional.isPresent() && optional.get().autoTopicCreationOverride != null) {
            return optional.get().autoTopicCreationOverride;
        }
        log.debug("No autoTopicCreateOverride policy found for {}", topicName);
        return null;
    }

    @Deprecated
    public boolean isAllowAutoSubscriptionCreation(String str) {
        return isAllowAutoSubscriptionCreation(TopicName.get(str));
    }

    @Deprecated
    public boolean isAllowAutoSubscriptionCreation(TopicName topicName) {
        AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = getAutoSubscriptionCreationOverride(topicName);
        return autoSubscriptionCreationOverride != null ? autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation() : this.pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
    }

    @Deprecated
    private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(TopicName topicName) {
        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
        if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
            return topicPolicies.get().getAutoSubscriptionCreationOverride();
        }
        Optional policiesIfCached = this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
        if (policiesIfCached.isPresent() && ((Policies) policiesIfCached.get()).autoSubscriptionCreationOverride != null) {
            return ((Policies) policiesIfCached.get()).autoSubscriptionCreationOverride;
        }
        log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
        return null;
    }

    @Nonnull
    public CompletionStage<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName topicName) {
        Objects.requireNonNull(topicName);
        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
        return (!topicPolicies.isPresent() || topicPolicies.get().getAutoSubscriptionCreationOverride() == null) ? this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenApply(optional -> {
            return (!optional.isPresent() || ((Policies) optional.get()).autoSubscriptionCreationOverride == null) ? Boolean.valueOf(this.pulsar.getConfiguration().isAllowAutoSubscriptionCreation()) : Boolean.valueOf(((Policies) optional.get()).autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation());
        }) : CompletableFuture.completedFuture(Boolean.valueOf(topicPolicies.get().getAutoSubscriptionCreationOverride().isAllowAutoSubscriptionCreation()));
    }

    public boolean isSystemTopic(String str) {
        return isSystemTopic(TopicName.get(str));
    }

    public boolean isSystemTopic(TopicName topicName) {
        return NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) || SystemTopicNames.isSystemTopic(topicName);
    }

    public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
        return !pulsar().getConfig().isTopicLevelPoliciesEnabled() ? Optional.empty() : Optional.ofNullable(this.pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(topicName));
    }

    public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
        return !pulsar().getConfig().isTopicLevelPoliciesEnabled() ? CompletableFuture.completedFuture(null) : this.pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
    }

    public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
        if (topicName.isPartitioned()) {
            return CompletableFuture.completedFuture(null);
        }
        String schemaName = TopicName.get(topicName.getPartitionedTopicName()).getSchemaName();
        return getPulsar().getSchemaRegistryService().deleteSchemaStorage(schemaName).whenComplete((schemaVersion, th) -> {
            if (schemaVersion == null || th != null) {
                return;
            }
            log.info("Deleted schema storage of id: {}", schemaName);
        });
    }

    private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int i) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()).thenCompose(optional -> {
            int intValue = ((Integer) optional.map(policies -> {
                return policies.max_topics_per_namespace;
            }).orElse(Integer.valueOf(this.pulsar.getConfig().getMaxTopicsPerNamespace()))).intValue();
            return (intValue <= 0 || isSystemTopic(topicName)) ? CompletableFuture.completedFuture(null) : pulsar().getPulsarResources().getTopicResources().getExistingPartitions(topicName).thenCompose(list -> {
                if (list.stream().filter(str -> {
                    return !isSystemTopic(TopicName.get(str));
                }).count() + i <= intValue) {
                    return CompletableFuture.completedFuture(null);
                }
                log.error("Failed to create persistent topic {}, exceed maximum number of topics in namespace", topicName);
                return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Exceed maximum number of topics in namespace."));
            });
        });
    }

    public void setInterceptor(BrokerInterceptor brokerInterceptor) {
        this.interceptor = brokerInterceptor;
    }

    public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        return this.brokerEntryMetadataInterceptors;
    }

    public boolean isBrokerEntryMetadataEnabled() {
        return !this.brokerEntryMetadataInterceptors.isEmpty();
    }

    public boolean isBrokerPayloadProcessorEnabled() {
        return !this.brokerEntryPayloadProcessors.isEmpty();
    }

    public void pausedConnections(int i) {
        this.pausedConnections.add(i);
    }

    public void resumedConnections(int i) {
        this.pausedConnections.add(-i);
    }

    public long getPausedConnections() {
        return this.pausedConnections.longValue();
    }

    @VisibleForTesting
    public <T extends Topic> T newTopic(String str, ManagedLedger managedLedger, BrokerService brokerService, Class<T> cls) throws PulsarServerException {
        if (this.topicFactory != null) {
            try {
                T t = (T) this.topicFactory.create(str, managedLedger, brokerService, cls);
                if (t != null) {
                    return t;
                }
            } catch (Throwable th) {
                log.warn("Failed to create persistent topic using factory {}", str, th);
                throw new PulsarServerException("Topic factory failed to create topic ", th);
            }
        }
        return cls == NonPersistentTopic.class ? new NonPersistentTopic(str, this) : new PersistentTopic(str, managedLedger, brokerService);
    }

    private TopicFactory createPersistentTopicFactory() throws Exception {
        String topicFactoryClassName = this.pulsar.getConfig().getTopicFactoryClassName();
        if (!StringUtils.isNotBlank(topicFactoryClassName)) {
            return null;
        }
        try {
            return (TopicFactory) Class.forName(topicFactoryClassName).newInstance();
        } catch (Exception e) {
            log.warn("Failed to initialize topic factory class {}", topicFactoryClassName, e);
            throw e;
        }
    }

    @VisibleForTesting
    public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
        this.pulsarChannelInitFactory = factory;
    }

    public PulsarService getPulsar() {
        return this.pulsar;
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    public ConcurrentOpenHashMap<String, PulsarAdmin> getClusterAdmins() {
        return this.clusterAdmins;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicsMap() {
        return this.multiLayerTopicsMap;
    }

    public EventLoopGroup getAcceptorGroup() {
        return this.acceptorGroup;
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> getOfflineTopicStatCache() {
        return this.offlineTopicStatCache;
    }

    public ConcurrentOpenHashMap<String, ConfigField> getDynamicConfigurationMap() {
        return this.dynamicConfigurationMap;
    }

    public ConcurrentOpenHashMap<String, java.util.function.Consumer<?>> getConfigRegisteredListeners() {
        return this.configRegisteredListeners;
    }

    public ConcurrentLinkedQueue<TopicLoadingContext> getPendingTopicLoadingQueue() {
        return this.pendingTopicLoadingQueue;
    }

    public ScheduledExecutorService getStatsUpdater() {
        return this.statsUpdater;
    }

    public AtomicReference<Semaphore> getTopicLoadRequestSemaphore() {
        return this.topicLoadRequestSemaphore;
    }

    public ObserverGauge getPendingLookupRequests() {
        return this.pendingLookupRequests;
    }

    public ObserverGauge getPendingTopicLoadRequests() {
        return this.pendingTopicLoadRequests;
    }

    public ScheduledExecutorService getInactivityMonitor() {
        return this.inactivityMonitor;
    }

    public ScheduledExecutorService getMessageExpiryMonitor() {
        return this.messageExpiryMonitor;
    }

    public ScheduledExecutorService getCompactionMonitor() {
        return this.compactionMonitor;
    }

    public ScheduledExecutorService getConsumedLedgersMonitor() {
        return this.consumedLedgersMonitor;
    }

    public ScheduledExecutorService getDeduplicationSnapshotMonitor() {
        return this.deduplicationSnapshotMonitor;
    }

    public PublishRateLimiter getBrokerPublishRateLimiter() {
        return this.brokerPublishRateLimiter;
    }

    public DispatchRateLimiter getBrokerDispatchRateLimiter() {
        return this.brokerDispatchRateLimiter;
    }

    public DistributedIdGenerator getProducerNameGenerator() {
        return this.producerNameGenerator;
    }

    public PulsarStats getPulsarStats() {
        return this.pulsarStats;
    }

    public LongAdder getTotalUnackedMessages() {
        return this.totalUnackedMessages;
    }

    public int getMaxUnackedMessages() {
        return this.maxUnackedMessages;
    }

    public int getMaxUnackedMsgsPerDispatcher() {
        return this.maxUnackedMsgsPerDispatcher;
    }

    public AtomicBoolean getBlockedDispatcherOnHighUnackedMsgs() {
        return this.blockedDispatcherOnHighUnackedMsgs;
    }

    public ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> getBlockedDispatchers() {
        return this.blockedDispatchers;
    }

    public ReadWriteLock getLock() {
        return this.lock;
    }

    public ServerBootstrap getDefaultServerBootstrap() {
        return this.defaultServerBootstrap;
    }

    public List<EventLoopGroup> getProtocolHandlersWorkerGroups() {
        return this.protocolHandlersWorkerGroups;
    }

    public PulsarChannelInitializer.Factory getPulsarChannelInitFactory() {
        return this.pulsarChannelInitFactory;
    }

    public List<Channel> getListenChannels() {
        return this.listenChannels;
    }

    public Channel getListenChannel() {
        return this.listenChannel;
    }

    public Channel getListenChannelTls() {
        return this.listenChannelTls;
    }

    public boolean isPreciseTopicPublishRateLimitingEnable() {
        return this.preciseTopicPublishRateLimitingEnable;
    }

    public BrokerInterceptor getInterceptor() {
        return this.interceptor;
    }

    public EntryFilterProvider getEntryFilterProvider() {
        return this.entryFilterProvider;
    }

    public TopicFactory getTopicFactory() {
        return this.topicFactory;
    }

    public Set<ManagedLedgerPayloadProcessor> getBrokerEntryPayloadProcessors() {
        return this.brokerEntryPayloadProcessors;
    }

    public TopicEventsDispatcher getTopicEventsDispatcher() {
        return this.topicEventsDispatcher;
    }

    public boolean isUnloaded() {
        return this.unloaded;
    }

    protected void setNumberOfNamespaceBundles(int i) {
        this.numberOfNamespaceBundles = i;
    }

    protected void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    protected void setDeduplicationSnapshotMonitor(ScheduledExecutorService scheduledExecutorService) {
        this.deduplicationSnapshotMonitor = scheduledExecutorService;
    }

    protected void setBrokerDispatchRateLimiter(DispatchRateLimiter dispatchRateLimiter) {
        this.brokerDispatchRateLimiter = dispatchRateLimiter;
    }

    protected void setProducerNameGenerator(DistributedIdGenerator distributedIdGenerator) {
        this.producerNameGenerator = distributedIdGenerator;
    }

    protected void setPulsarChannelInitFactory(PulsarChannelInitializer.Factory factory) {
        this.pulsarChannelInitFactory = factory;
    }

    protected void setListenChannel(Channel channel) {
        this.listenChannel = channel;
    }

    protected void setListenChannelTls(Channel channel) {
        this.listenChannelTls = channel;
    }

    protected void setPreciseTopicPublishRateLimitingEnable(boolean z) {
        this.preciseTopicPublishRateLimitingEnable = z;
    }

    protected void setTopicFactory(TopicFactory topicFactory) {
        this.topicFactory = topicFactory;
    }

    protected void setBrokerEntryMetadataInterceptors(Set<BrokerEntryMetadataInterceptor> set) {
        this.brokerEntryMetadataInterceptors = set;
    }

    protected void setBrokerEntryPayloadProcessors(Set<ManagedLedgerPayloadProcessor> set) {
        this.brokerEntryPayloadProcessors = set;
    }

    protected void setUnloaded(boolean z) {
        this.unloaded = z;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> getOwningTopics() {
        return this.owningTopics;
    }

    public ScheduledExecutorService getBacklogQuotaChecker() {
        return this.backlogQuotaChecker;
    }

    public BundlesQuotas getBundlesQuotas() {
        return this.bundlesQuotas;
    }
}
