/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.ShutdownService;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableMap;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.shade.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.javax.servlet.ServletException;
import org.apache.pulsar.shade.javax.websocket.DeploymentException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.pulsar.shade.org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.eclipse.jetty.servlet.ServletHolder;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarService
implements AutoCloseable,
ShutdownService {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
    private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5;
    private ServiceConfiguration config = null;
    private NamespaceService nsService = null;
    private ManagedLedgerStorage managedLedgerClientFactory = null;
    private LeaderElectionService leaderElectionService = null;
    private BrokerService brokerService = null;
    private WebService webService = null;
    private WebSocketService webSocketService = null;
    private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
    private BookKeeperClientFactory bkClientFactory;
    private Compactor compactor;
    private ResourceUsageTransportManager resourceUsageTransportManager;
    private ResourceGroupService resourceGroupServiceManager;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService cacheExecutor;
    private OrderedExecutor orderedExecutor;
    private final ScheduledExecutorService loadManagerExecutor;
    private ScheduledExecutorService compactorExecutor;
    private OrderedScheduler offloaderScheduler;
    private OffloadersCache offloadersCache = new OffloadersCache();
    private LedgerOffloader defaultOffloader;
    private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<NamespaceName, LedgerOffloader>();
    private ScheduledFuture<?> loadReportTask = null;
    private ScheduledFuture<?> loadSheddingTask = null;
    private ScheduledFuture<?> loadResourceQuotaTask = null;
    private final AtomicReference<LoadManager> loadManager = new AtomicReference();
    private PulsarAdmin adminClient = null;
    private PulsarClient client = null;
    private ZooKeeperClientFactory zkClientFactory = null;
    private final String bindAddress;
    private final String advertisedAddress;
    private String webServiceAddress;
    private String webServiceAddressTls;
    private String brokerServiceUrl;
    private String brokerServiceUrlTls;
    private final String brokerVersion;
    private SchemaStorage schemaStorage = null;
    private SchemaRegistryService schemaRegistryService = null;
    private final WorkerConfig workerConfig;
    private final Optional<WorkerService> functionWorkerService;
    private ProtocolHandlers protocolHandlers = null;
    private final Consumer<Integer> processTerminator;
    protected final EventLoopGroup ioEventLoopGroup;
    private final ExecutorProvider brokerClientSharedInternalExecutorProvider;
    private final ExecutorProvider brokerClientSharedExternalExecutorProvider;
    private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
    private final Timer brokerClientSharedTimer;
    private MetricsGenerator metricsGenerator;
    private TransactionMetadataStoreService transactionMetadataStoreService;
    private TransactionBufferProvider transactionBufferProvider;
    private TransactionBufferClient transactionBufferClient;
    private HashedWheelTimer transactionTimer;
    private BrokerInterceptor brokerInterceptor;
    private AdditionalServlets brokerAdditionalServlets;
    private Optional<PackagesManagement> packagesManagement = Optional.empty();
    private PrometheusMetricsServlet metricsServlet;
    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
    private MetadataStoreExtended localMetadataStore;
    private CoordinationService coordinationService;
    private TransactionBufferSnapshotService transactionBufferSnapshotService;
    private MetadataStore configurationMetadataStore;
    private boolean shouldShutdownConfigurationMetadataStore;
    private PulsarResources pulsarResources;
    private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
    private final ExecutorProvider transactionExecutorProvider;
    private volatile State state;
    private final ReentrantLock mutex = new ReentrantLock();
    private final Condition isClosedCondition = this.mutex.newCondition();
    private volatile CompletableFuture<Void> closeFuture;
    private Map<String, AdvertisedListener> advertisedListeners;
    private NamespaceName heartbeatNamespaceV2;
    private NamespaceName heartbeatNamespaceV1;

    public PulsarService(ServiceConfiguration config) {
        this(config, Optional.empty(), exitCode -> LOG.info("Process termination requested with code {}. Ignoring, as this constructor is intended for tests. ", exitCode));
    }

    public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService, Consumer<Integer> processTerminator) {
        this(config, new WorkerConfig(), functionWorkerService, processTerminator);
    }

    public PulsarService(ServiceConfiguration config, WorkerConfig workerConfig, Optional<WorkerService> functionWorkerService, Consumer<Integer> processTerminator) {
        this.state = State.Init;
        PulsarConfigurationLoader.isComplete(config);
        this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
        this.advertisedAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
        this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
        this.brokerVersion = PulsarVersion.getVersion();
        this.config = config;
        this.processTerminator = processTerminator;
        this.loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
        this.workerConfig = workerConfig;
        this.functionWorkerService = functionWorkerService;
        this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(), new ExecutorProvider.ExtendedThreadFactory("pulsar"));
        this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(), new ExecutorProvider.ExtendedThreadFactory("zk-cache-callback"));
        this.transactionExecutorProvider = config.isTransactionCoordinatorEnabled() ? new ExecutorProvider(this.getConfiguration().getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-executor") : null;
        this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(), new DefaultThreadFactory("pulsar-io"));
        this.brokerClientSharedInternalExecutorProvider = new ExecutorProvider(1, "broker-client-shared-internal-executor");
        this.brokerClientSharedExternalExecutorProvider = new ExecutorProvider(1, "broker-client-shared-external-executor");
        this.brokerClientSharedScheduledExecutorProvider = new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor");
        this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1L, TimeUnit.MILLISECONDS);
    }

    public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
        return MetadataStoreFactory.create(this.config.getConfigurationStoreServers(), MetadataStoreConfig.builder().sessionTimeoutMillis((int)this.config.getZooKeeperSessionTimeoutMillis()).allowReadOnlyOperations(this.config.isZookeeperAllowReadOnlyOperations()).build());
    }

    public void closeMetadataServiceSession() throws Exception {
        this.localMetadataStore.close();
    }

    @Override
    public void close() throws PulsarServerException {
        try {
            this.closeAsync().get();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof PulsarServerException) {
                throw (PulsarServerException)cause;
            }
            if (this.getConfiguration().getBrokerShutdownTimeoutMs() != 0L || !(cause instanceof TimeoutException) && !(cause instanceof CancellationException)) {
                throw new PulsarServerException(cause);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        this.mutex.lock();
        try {
            if (this.closeFuture != null) {
                CompletableFuture<Void> completableFuture = this.closeFuture;
                return completableFuture;
            }
            LOG.info("Closing PulsarService");
            this.state = State.Closing;
            if (this.resourceUsageTransportManager != null) {
                this.resourceUsageTransportManager.close();
                this.resourceUsageTransportManager = null;
            }
            if (this.resourceGroupServiceManager != null) {
                try {
                    this.resourceGroupServiceManager.close();
                }
                catch (Exception e) {
                    LOG.warn("ResourceGroupServiceManager closing failed {}", (Object)e.getMessage());
                }
                this.resourceGroupServiceManager = null;
            }
            if (this.webService != null) {
                try {
                    this.webService.close();
                    this.webService = null;
                }
                catch (Exception e) {
                    LOG.error("Web service closing failed", (Throwable)e);
                }
            }
            this.metricsServlet = null;
            if (this.webSocketService != null) {
                this.webSocketService.close();
            }
            if (this.brokerAdditionalServlets != null) {
                this.brokerAdditionalServlets.close();
                this.brokerAdditionalServlets = null;
            }
            GracefulExecutorServicesShutdown executorServicesShutdown = GracefulExecutorServicesShutdown.initiate().timeout(Duration.ofMillis((long)(0.5 * (double)this.getConfiguration().getBrokerShutdownTimeoutMs())));
            executorServicesShutdown.shutdown(this.loadManagerExecutor);
            LoadManager loadManager = this.loadManager.get();
            if (loadManager != null) {
                loadManager.stop();
            }
            ArrayList<CompletionStage> asyncCloseFutures = new ArrayList<CompletionStage>();
            if (this.brokerService != null) {
                CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
                if (this.transactionMetadataStoreService != null) {
                    asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
                        this.transactionMetadataStoreService.close();
                        this.transactionMetadataStoreService = null;
                    }));
                } else {
                    asyncCloseFutures.add(brokerCloseFuture);
                }
                this.brokerService = null;
            }
            if (this.managedLedgerClientFactory != null) {
                this.managedLedgerClientFactory.close();
                this.managedLedgerClientFactory = null;
            }
            if (this.bkClientFactory != null) {
                this.bkClientFactory.close();
                this.bkClientFactory = null;
            }
            if (this.leaderElectionService != null) {
                this.leaderElectionService.close();
                this.leaderElectionService = null;
            }
            if (this.adminClient != null) {
                this.adminClient.close();
                this.adminClient = null;
            }
            if (this.transactionBufferSnapshotService != null) {
                this.transactionBufferSnapshotService.close();
                this.transactionBufferSnapshotService = null;
            }
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
            if (this.nsService != null) {
                this.nsService.close();
                this.nsService = null;
            }
            executorServicesShutdown.shutdown(this.compactorExecutor);
            executorServicesShutdown.shutdown(this.offloaderScheduler);
            executorServicesShutdown.shutdown(this.executor);
            executorServicesShutdown.shutdown(this.orderedExecutor);
            executorServicesShutdown.shutdown(this.cacheExecutor);
            if (this.schemaRegistryService != null) {
                this.schemaRegistryService.close();
            }
            this.offloadersCache.close();
            if (this.protocolHandlers != null) {
                this.protocolHandlers.close();
                this.protocolHandlers = null;
            }
            if (this.transactionBufferClient != null) {
                this.transactionBufferClient.close();
            }
            if (this.coordinationService != null) {
                this.coordinationService.close();
            }
            if (this.localMetadataStore != null) {
                this.localMetadataStore.close();
            }
            if (this.configurationMetadataStore != null && this.shouldShutdownConfigurationMetadataStore) {
                this.configurationMetadataStore.close();
            }
            if (this.transactionExecutorProvider != null) {
                this.transactionExecutorProvider.shutdownNow();
            }
            this.brokerClientSharedExternalExecutorProvider.shutdownNow();
            this.brokerClientSharedInternalExecutorProvider.shutdownNow();
            this.brokerClientSharedScheduledExecutorProvider.shutdownNow();
            this.brokerClientSharedTimer.stop();
            this.ioEventLoopGroup.shutdownGracefully();
            asyncCloseFutures.add(executorServicesShutdown.handle());
            this.closeFuture = this.addTimeoutHandling(FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures));
            this.closeFuture.handle((v, t) -> {
                if (t == null) {
                    LOG.info("Closed");
                } else if (t instanceof CancellationException) {
                    LOG.info("Closed (shutdown cancelled)");
                } else if (t instanceof TimeoutException) {
                    LOG.info("Closed (shutdown timeout)");
                } else {
                    LOG.warn("Closed with errors", t);
                }
                this.state = State.Closed;
                this.isClosedCondition.signalAll();
                return null;
            });
            CompletableFuture<Void> completableFuture = this.closeFuture;
            return completableFuture;
        }
        catch (Exception e) {
            PulsarServerException pse = e instanceof CompletionException && e.getCause() instanceof MetadataStoreException ? new PulsarServerException(MetadataStoreException.unwrap(e)) : (e.getCause() instanceof CompletionException && e.getCause().getCause() instanceof MetadataStoreException ? new PulsarServerException(MetadataStoreException.unwrap(e.getCause())) : new PulsarServerException(e));
            CompletableFuture<Void> completableFuture = FutureUtil.failedFuture(pse);
            return completableFuture;
        }
        finally {
            this.mutex.unlock();
        }
    }

    private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
        ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(this.getClass().getSimpleName() + "-shutdown"));
        FutureUtil.addTimeoutHandling(future, Duration.ofMillis(Math.max(1L, this.getConfiguration().getBrokerShutdownTimeoutMs())), shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", this.getClass(), "close"));
        future.handle((v, t) -> {
            shutdownExecutor.shutdownNow();
            return null;
        });
        return future;
    }

    public ServiceConfiguration getConfiguration() {
        return this.config;
    }

    public Optional<WorkerConfig> getWorkerConfig() {
        return this.functionWorkerService.map(service -> this.workerConfig);
    }

    public Map<String, String> getProtocolDataToAdvertise() {
        if (null == this.protocolHandlers) {
            return Collections.emptyMap();
        }
        return this.protocolHandlers.getProtocolDataToAdvertise();
    }

    public void start() throws PulsarServerException {
        LOG.info("Starting Pulsar Broker service; version: '{}'", (Object)(this.brokerVersion != null ? this.brokerVersion : "unknown"));
        LOG.info("Git Revision {}", (Object)PulsarVersion.getGitSha());
        LOG.info("Git Branch {}", (Object)PulsarVersion.getGitBranch());
        LOG.info("Built by {} on {} at {}", new Object[]{PulsarVersion.getBuildUser(), PulsarVersion.getBuildHost(), PulsarVersion.getBuildTime()});
        long startTimestamp = System.currentTimeMillis();
        this.mutex.lock();
        try {
            if (this.state != State.Init) {
                throw new PulsarServerException("Cannot start the service once it was stopped");
            }
            if (!this.config.getWebServicePort().isPresent() && !this.config.getWebServicePortTls().isPresent()) {
                throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
            }
            if (this.config.isAuthorizationEnabled() && !this.config.isAuthenticationEnabled()) {
                throw new IllegalStateException("Invalid broker configuration. Authentication must be enabled with authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
            }
            this.localMetadataStore = this.createLocalMetadataStore();
            this.localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
            this.coordinationService = new CoordinationServiceImpl(this.localMetadataStore);
            if (!StringUtils.equals(this.config.getConfigurationStoreServers(), this.config.getZookeeperServers())) {
                this.configurationMetadataStore = this.createConfigurationMetadataStore();
                this.shouldShutdownConfigurationMetadataStore = true;
            } else {
                this.configurationMetadataStore = this.localMetadataStore;
                this.shouldShutdownConfigurationMetadataStore = false;
            }
            this.pulsarResources = new PulsarResources(this.localMetadataStore, this.configurationMetadataStore, this.config.getZooKeeperOperationTimeoutSeconds());
            this.pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
            this.orderedExecutor = OrderedExecutor.newBuilder().numThreads(this.config.getNumOrderedExecutorThreads()).name("pulsar-ordered").build();
            this.protocolHandlers = ProtocolHandlers.load(this.config);
            this.protocolHandlers.initialize(this.config);
            this.bkClientFactory = this.newBookKeeperClientFactory();
            this.managedLedgerClientFactory = ManagedLedgerStorage.create(this.config, this.localMetadataStore, this.getZkClient(), this.bkClientFactory, this.ioEventLoopGroup);
            this.brokerService = this.newBrokerService(this);
            this.loadManager.set(LoadManager.create(this));
            this.startNamespaceService();
            this.schemaStorage = this.createAndStartSchemaStorage();
            this.schemaRegistryService = SchemaRegistryService.create(this.schemaStorage, this.config.getSchemaRegistryCompatibilityCheckers());
            this.defaultOffloader = this.createManagedLedgerOffloader(OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
            this.brokerInterceptor = BrokerInterceptors.load(this.config);
            BrokerInterceptor interceptor = this.getBrokerInterceptor();
            if (interceptor != null) {
                this.brokerService.setInterceptor(interceptor);
                interceptor.initialize(this);
            }
            this.brokerService.start();
            this.brokerAdditionalServlets = AdditionalServlets.load(this.config);
            this.webService = new WebService(this);
            this.metricsServlet = new PrometheusMetricsServlet(this, this.config.isExposeTopicLevelMetricsInPrometheus(), this.config.isExposeConsumerLevelMetricsInPrometheus(), this.config.isExposeProducerLevelMetricsInPrometheus(), this.config.isSplitTopicAndPartitionLabelInPrometheus());
            if (this.pendingMetricsProviders != null) {
                this.pendingMetricsProviders.forEach(provider -> this.metricsServlet.addRawMetricsProvider((PrometheusRawMetricsProvider)provider));
                this.pendingMetricsProviders = null;
            }
            this.addWebServerHandlers(this.webService, this.metricsServlet, this.config);
            this.webService.start();
            this.heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
            this.heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config);
            if (this.config.getBrokerServicePort().equals(Optional.of(0))) {
                this.config.setBrokerServicePort(this.brokerService.getListenPort());
            }
            if (this.config.getBrokerServicePortTls().equals(Optional.of(0))) {
                this.config.setBrokerServicePortTls(this.brokerService.getListenPortTls());
            }
            this.webServiceAddress = this.webAddress(this.config);
            this.webServiceAddressTls = this.webAddressTls(this.config);
            this.brokerServiceUrl = this.brokerUrl(this.config);
            this.brokerServiceUrlTls = this.brokerUrlTls(this.config);
            if (null != this.webSocketService) {
                ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(this.webServiceAddress).serviceUrlTls(this.webServiceAddressTls).brokerServiceUrl(this.brokerServiceUrl).brokerServiceUrlTls(this.brokerServiceUrlTls).build();
                this.webSocketService.setLocalCluster(clusterData);
            }
            this.nsService.initialize();
            if (this.config.isTopicLevelPoliciesEnabled() && this.config.isSystemTopicEnabled()) {
                this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
            }
            this.topicPoliciesService.start();
            this.startLeaderElectionService();
            this.nsService.registerBootstrapNamespaces();
            if (this.config.isTransactionCoordinatorEnabled()) {
                this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(this.getClient());
                this.transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
                this.transactionBufferClient = TransactionBufferClientImpl.create(this, this.transactionTimer, this.config.getTransactionBufferClientMaxConcurrentRequests(), this.config.getTransactionBufferClientOperationTimeoutInMills());
                this.transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider.newProvider(this.config.getTransactionMetadataStoreProviderClassName()), this, this.transactionBufferClient, this.transactionTimer);
                this.transactionBufferProvider = TransactionBufferProvider.newProvider(this.config.getTransactionBufferProviderClassName());
                this.transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider.newProvider(this.config.getTransactionPendingAckStoreProviderClassName());
            }
            this.metricsGenerator = new MetricsGenerator(this);
            this.startLoadManagementService();
            this.protocolHandlers.start(this.brokerService);
            Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers = this.protocolHandlers.newChannelInitializers();
            this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers);
            this.acquireSLANamespace();
            this.startWorkerService(this.brokerService.getAuthenticationService(), this.brokerService.getAuthorizationService());
            if (this.config.isEnablePackagesManagement()) {
                this.startPackagesManagementService();
            }
            this.resourceUsageTransportManager = ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
            if (StringUtils.isNotBlank(this.config.getResourceUsageTransportClassName())) {
                Class<?> clazz = Class.forName(this.config.getResourceUsageTransportClassName());
                Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
                Object object = ctor.newInstance(this);
                this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager)object;
            }
            this.resourceGroupServiceManager = new ResourceGroupService(this);
            long currentTimestamp = System.currentTimeMillis();
            long bootstrapTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - startTimestamp);
            String bootstrapMessage = "bootstrap service " + (this.config.getWebServicePort().isPresent() ? "port = " + this.config.getWebServicePort().get() : "") + (this.config.getWebServicePortTls().isPresent() ? ", tls-port = " + this.config.getWebServicePortTls() : "") + (StringUtils.isNotEmpty(this.brokerServiceUrl) ? ", broker url= " + this.brokerServiceUrl : "") + (StringUtils.isNotEmpty(this.brokerServiceUrlTls) ? ", broker tls url= " + this.brokerServiceUrlTls : "");
            LOG.info("messaging service is ready, bootstrap_seconds={}", (Object)bootstrapTimeSeconds);
            LOG.info("messaging service is ready, {}, cluster={}, configs={}", new Object[]{bootstrapMessage, this.config.getClusterName(), ReflectionToStringBuilder.toString(this.config)});
            this.state = State.Started;
        }
        catch (Exception e) {
            LOG.error("Failed to start Pulsar service: {}", (Object)e.getMessage(), (Object)e);
            throw new PulsarServerException(e);
        }
        finally {
            this.mutex.unlock();
        }
    }

    private void addWebServerHandlers(WebService webService, PrometheusMetricsServlet metricsServlet, ServiceConfiguration config) throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException {
        HashMap<String, Object> attributeMap = Maps.newHashMap();
        attributeMap.put("pulsar", this);
        HashMap<String, Object> vipAttributeMap = Maps.newHashMap();
        vipAttributeMap.put("statusFilePath", config.getStatusFilePath());
        vipAttributeMap.put("isReadyProbe", () -> this.state == State.Started);
        webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap);
        webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
        webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
        webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
        webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
        webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
        webService.addRestResources("/topics", "org.apache.pulsar.broker.rest", true, attributeMap);
        webService.addServlet("/metrics", new ServletHolder(metricsServlet), false, attributeMap);
        this.addWebSocketServiceHandler(webService, attributeMap, config);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Attempting to add static directory");
        }
        webService.addStaticResources("/static", "/static");
        this.addBrokerAdditionalServlets(webService, attributeMap, config);
    }

    private void handleMetadataSessionEvent(SessionEvent e) {
        LOG.info("Received metadata service session event: {}", (Object)e);
        if (e == SessionEvent.SessionLost && this.config.getZookeeperSessionExpiredPolicy() == MetadataSessionExpiredPolicy.shutdown) {
            LOG.warn("The session with metadata service was lost. Shutting down.");
            this.shutdownNow();
        }
    }

    private void addBrokerAdditionalServlets(WebService webService, Map<String, Object> attributeMap, ServiceConfiguration config) {
        if (this.getBrokerAdditionalServlets() != null) {
            Collection<AdditionalServletWithClassLoader> additionalServletCollection = this.getBrokerAdditionalServlets().getServlets().values();
            for (AdditionalServletWithClassLoader servletWithClassLoader : additionalServletCollection) {
                servletWithClassLoader.loadConfig(config);
                AdditionalServlet additionalServlet = servletWithClassLoader.getServlet();
                if (additionalServlet instanceof AdditionalServletWithPulsarService) {
                    ((AdditionalServletWithPulsarService)additionalServlet).setPulsarService(this);
                }
                webService.addServlet(servletWithClassLoader.getBasePath(), servletWithClassLoader.getServletHolder(), config.isAuthenticationEnabled(), attributeMap);
                LOG.info("Broker add additional servlet basePath {} ", (Object)servletWithClassLoader.getBasePath());
            }
        }
    }

    private void addWebSocketServiceHandler(WebService webService, Map<String, Object> attributeMap, ServiceConfiguration config) throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException {
        if (config.isWebSocketServiceEnabled()) {
            this.webSocketService = new WebSocketService(null, config);
            this.webSocketService.start();
            WebSocketProducerServlet producerWebSocketServlet = new WebSocketProducerServlet(this.webSocketService);
            webService.addServlet("/ws/producer", new ServletHolder(producerWebSocketServlet), true, attributeMap);
            webService.addServlet("/ws/v2/producer", new ServletHolder(producerWebSocketServlet), true, attributeMap);
            WebSocketConsumerServlet consumerWebSocketServlet = new WebSocketConsumerServlet(this.webSocketService);
            webService.addServlet("/ws/consumer", new ServletHolder(consumerWebSocketServlet), true, attributeMap);
            webService.addServlet("/ws/v2/consumer", new ServletHolder(consumerWebSocketServlet), true, attributeMap);
            WebSocketReaderServlet readerWebSocketServlet = new WebSocketReaderServlet(this.webSocketService);
            webService.addServlet("/ws/reader", new ServletHolder(readerWebSocketServlet), true, attributeMap);
            webService.addServlet("/ws/v2/reader", new ServletHolder(readerWebSocketServlet), true, attributeMap);
            WebSocketPingPongServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(this.webSocketService);
            webService.addServlet("/ws/pingpong", new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
            webService.addServlet("/ws/v2/pingpong", new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
        }
    }

    private void handleDeleteCluster(Notification notification) {
        if (ClusterResources.pathRepresentsClusterName(notification.getPath()) && notification.getType() == NotificationType.Deleted) {
            String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
            this.getBrokerService().closeAndRemoveReplicationClient(clusterName);
        }
    }

    public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
        return MetadataStoreExtended.create(this.config.getZookeeperServers(), MetadataStoreConfig.builder().sessionTimeoutMillis((int)this.config.getZooKeeperSessionTimeoutMillis()).allowReadOnlyOperations(this.config.isZookeeperAllowReadOnlyOperations()).build());
    }

    protected void startLeaderElectionService() {
        this.leaderElectionService = new LeaderElectionService(this.coordinationService, this.getSafeWebServiceAddress(), state -> {
            if (state == LeaderElectionState.Leading) {
                LOG.info("This broker was elected leader");
                if (this.getConfiguration().isLoadBalancerEnabled()) {
                    long loadSheddingInterval = TimeUnit.MINUTES.toMillis(this.getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                    long resourceQuotaUpdateInterval = TimeUnit.MINUTES.toMillis(this.getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
                    if (this.loadSheddingTask != null) {
                        this.loadSheddingTask.cancel(false);
                    }
                    if (this.loadResourceQuotaTask != null) {
                        this.loadResourceQuotaTask.cancel(false);
                    }
                    this.loadSheddingTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(this.loadManager), loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
                    this.loadResourceQuotaTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadResourceQuotaUpdaterTask(this.loadManager), resourceQuotaUpdateInterval, resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
                }
            } else {
                if (this.leaderElectionService != null) {
                    LOG.info("This broker is a follower. Current leader is {}", this.leaderElectionService.getCurrentLeader());
                }
                if (this.loadSheddingTask != null) {
                    this.loadSheddingTask.cancel(false);
                    this.loadSheddingTask = null;
                }
                if (this.loadResourceQuotaTask != null) {
                    this.loadResourceQuotaTask.cancel(false);
                    this.loadResourceQuotaTask = null;
                }
            }
        });
        this.leaderElectionService.start();
    }

    protected void acquireSLANamespace() {
        try {
            boolean acquiredSLANamespace;
            NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(this.getAdvertisedAddress(), this.config);
            if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
                LOG.info("SLA Namespace = {} doesn't exist.", (Object)nsName);
                return;
            }
            try {
                acquiredSLANamespace = this.nsService.registerSLANamespace();
                LOG.info("Register SLA Namespace = {}, returned - {}.", (Object)nsName, (Object)acquiredSLANamespace);
            }
            catch (PulsarServerException e) {
                acquiredSLANamespace = false;
            }
            if (!acquiredSLANamespace) {
                this.nsService.unloadSLANamespace();
            }
        }
        catch (Exception ex) {
            LOG.warn("Exception while trying to unload the SLA namespace, will try to unload the namespace again after 1 minute. Exception:", (Throwable)ex);
            this.executor.schedule(this::acquireSLANamespace, 1L, TimeUnit.MINUTES);
        }
        catch (Throwable ex) {
            LOG.warn("Exception while trying to unload the SLA namespace, will not try to unload the namespace again. Exception:", ex);
        }
    }

    public void waitUntilClosed() throws InterruptedException {
        this.mutex.lock();
        try {
            while (this.state != State.Closed) {
                this.isClosedCondition.await();
            }
        }
        finally {
            this.mutex.unlock();
        }
    }

    protected void startNamespaceService() throws PulsarServerException {
        LOG.info("Starting name space service, bootstrap namespaces=" + this.config.getBootstrapNamespaces());
        this.nsService = this.getNamespaceServiceProvider().get();
    }

    public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
        return () -> new NamespaceService(this);
    }

    protected void startLoadManagementService() throws PulsarServerException {
        LOG.info("Starting load management service ...");
        this.loadManager.get().start();
        if (this.config.isLoadBalancerEnabled()) {
            LOG.info("Starting load balancer");
            if (this.loadReportTask == null) {
                long loadReportMinInterval = this.config.getLoadBalancerReportUpdateMinIntervalMillis();
                this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadReportUpdaterTask(this.loadManager), loadReportMinInterval, loadReportMinInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void loadNamespaceTopics(NamespaceBundle bundle) {
        this.executor.submit(() -> {
            LOG.info("Loading all topics on bundle: {}", (Object)bundle);
            NamespaceName nsName = bundle.getNamespaceObject();
            ArrayList<CompletableFuture<Optional<Topic>>> persistentTopics = Lists.newArrayList();
            long topicLoadStart = System.nanoTime();
            for (String topic : this.getNamespaceService().getListOfPersistentTopics(nsName).get(this.config.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
                try {
                    CompletableFuture<Optional<Topic>> future;
                    TopicName topicName = TopicName.get(topic);
                    if (!bundle.includes(topicName) || PulsarService.isTransactionSystemTopic(topicName) || (future = this.brokerService.getTopicIfExists(topic)) == null) continue;
                    persistentTopics.add(future);
                }
                catch (Throwable t) {
                    LOG.warn("Failed to preload topic {}", (Object)topic, (Object)t);
                }
            }
            if (!persistentTopics.isEmpty()) {
                FutureUtil.waitForAll(persistentTopics).thenRun(() -> {
                    double topicLoadTimeSeconds = (double)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - topicLoadStart) / 1000.0;
                    long numTopicsLoaded = persistentTopics.stream().filter(optionalTopicFuture -> optionalTopicFuture.getNow(Optional.empty()).isPresent()).count();
                    LOG.info("Loaded {} topics on {} -- time taken: {} seconds", new Object[]{numTopicsLoaded, bundle, topicLoadTimeSeconds});
                });
            }
            return null;
        });
    }

    public String getStatusFilePath() {
        if (this.config == null) {
            return null;
        }
        return this.config.getStatusFilePath();
    }

    public String getMetadataServiceUri() {
        return PulsarService.bookieMetadataServiceUri(this.getConfiguration());
    }

    public InternalConfigurationData getInternalConfigurationData() {
        String metadataServiceUri = this.getMetadataServiceUri();
        if (StringUtils.isNotBlank(this.config.getBookkeeperMetadataServiceUri())) {
            metadataServiceUri = this.getConfiguration().getBookkeeperMetadataServiceUri();
        }
        return new InternalConfigurationData(this.getConfiguration().getZookeeperServers(), this.getConfiguration().getConfigurationStoreServers(), new ClientConfiguration().getZkLedgersRootPath(), metadataServiceUri, this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
    }

    public State getState() {
        return this.state;
    }

    public LeaderElectionService getLeaderElectionService() {
        return this.leaderElectionService;
    }

    public NamespaceService getNamespaceService() {
        return this.nsService;
    }

    public Optional<WorkerService> getWorkerServiceOpt() {
        return this.functionWorkerService;
    }

    public WorkerService getWorkerService() throws UnsupportedOperationException {
        return this.functionWorkerService.orElseThrow(() -> new UnsupportedOperationException("Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false"));
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public BookKeeper getBookKeeperClient() {
        return this.managedLedgerClientFactory.getBookKeeperClient();
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerClientFactory.getManagedLedgerFactory();
    }

    public ManagedLedgerStorage getManagedLedgerClientFactory() {
        return this.managedLedgerClientFactory;
    }

    public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPoliciesImpl offloadPolicies) {
        if (offloadPolicies == null) {
            return this.getDefaultOffloader();
        }
        return this.ledgerOffloaderMap.compute(namespaceName, (ns, offloader) -> {
            try {
                if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) {
                    return offloader;
                }
                if (offloader != null) {
                    offloader.close();
                }
                return this.createManagedLedgerOffloader(offloadPolicies);
            }
            catch (PulsarServerException e) {
                LOG.error("create ledgerOffloader failed for namespace {}", (Object)namespaceName.toString(), (Object)e);
                return new NullLedgerOffloader();
            }
        });
    }

    public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies) throws PulsarServerException {
        try {
            if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                Preconditions.checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", (Object)offloadPolicies.getManagedLedgerOffloadDriver());
                PulsarService pulsarService = this;
                synchronized (pulsarService) {
                    Offloaders offloaders = this.offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(), this.config.getNarExtractionDirectory());
                    LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(offloadPolicies.getManagedLedgerOffloadDriver());
                    try {
                        return offloaderFactory.create(offloadPolicies, ImmutableMap.of("S3ManagedLedgerOffloaderSoftwareVersion".toLowerCase(), PulsarVersion.getVersion(), "S3ManagedLedgerOffloaderSoftwareGitSha".toLowerCase(), PulsarVersion.getGitSha(), "pulsarClusterName".toLowerCase(), this.config.getClusterName()), this.schemaStorage, this.getOffloaderScheduler(offloadPolicies));
                    }
                    catch (IOException ioe) {
                        throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
                    }
                }
            }
            LOG.debug("No ledger offloader configured, using NULL instance");
            return NullLedgerOffloader.INSTANCE;
        }
        catch (Throwable t) {
            throw new PulsarServerException(t);
        }
    }

    private SchemaStorage createAndStartSchemaStorage() throws Exception {
        Class<?> storageClass = Class.forName(this.config.getSchemaRegistryStorageClassName());
        Object factoryInstance = storageClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        Method createMethod = storageClass.getMethod("create", PulsarService.class, ZooKeeper.class);
        SchemaStorage schemaStorage = (SchemaStorage)createMethod.invoke(factoryInstance, this, this.getZkClient());
        schemaStorage.start();
        return schemaStorage;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getCacheExecutor() {
        return this.cacheExecutor;
    }

    public ExecutorProvider getTransactionExecutorProvider() {
        return this.transactionExecutorProvider;
    }

    public ScheduledExecutorService getLoadManagerExecutor() {
        return this.loadManagerExecutor;
    }

    public OrderedExecutor getOrderedExecutor() {
        return this.orderedExecutor;
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperBkClientFactoryImpl(this.orderedExecutor);
        }
        return this.zkClientFactory;
    }

    public BookKeeperClientFactory newBookKeeperClientFactory() {
        return new BookKeeperClientFactoryImpl();
    }

    public BookKeeperClientFactory getBookKeeperClientFactory() {
        return this.bkClientFactory;
    }

    protected synchronized ScheduledExecutorService getCompactorExecutor() {
        if (this.compactorExecutor == null) {
            this.compactorExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("compaction"));
        }
        return this.compactorExecutor;
    }

    public Compactor newCompactor() throws PulsarServerException {
        return new TwoPhaseCompactor(this.getConfiguration(), this.getClient(), this.getBookKeeperClient(), this.getCompactorExecutor());
    }

    public synchronized Compactor getCompactor() throws PulsarServerException {
        if (this.compactor == null) {
            this.compactor = this.newCompactor();
        }
        return this.compactor;
    }

    public Compactor getNullableCompactor() {
        return this.compactor;
    }

    protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()).name("offloader").build();
        }
        return this.offloaderScheduler;
    }

    public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) throws PulsarClientException {
        return PulsarClientImpl.builder().conf(clientConf).eventLoopGroup(this.ioEventLoopGroup).timer(this.brokerClientSharedTimer).internalExecutorProvider(this.brokerClientSharedInternalExecutorProvider).externalExecutorProvider(this.brokerClientSharedExternalExecutorProvider).scheduledExecutorProvider(this.brokerClientSharedScheduledExecutorProvider).build();
    }

    public synchronized PulsarClient getClient() throws PulsarServerException {
        if (this.client == null) {
            try {
                ClientConfigurationData initialConf = new ClientConfigurationData();
                initialConf.setStatsIntervalSeconds(0L);
                Map<String, Object> overrides = PropertiesUtils.filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
                ClientConfigurationData conf = ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
                boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
                conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
                if (tlsEnabled) {
                    conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
                    conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
                    conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
                    conf.setTlsHostnameVerificationEnable(this.getConfiguration().isTlsHostnameVerificationEnabled());
                    if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
                        conf.setUseKeyStoreTls(true);
                        conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());
                        conf.setTlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore());
                        conf.setTlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword());
                    } else {
                        conf.setTlsTrustCertsFilePath(StringUtils.isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath()) ? this.getConfiguration().getBrokerClientTrustCertsFilePath() : this.getConfiguration().getTlsCertificateFilePath());
                    }
                }
                if (StringUtils.isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
                    conf.setAuthPluginClassName(this.getConfiguration().getBrokerClientAuthenticationPlugin());
                    conf.setAuthParams(this.getConfiguration().getBrokerClientAuthenticationParameters());
                    conf.setAuthParamMap(null);
                    conf.setAuthentication(AuthenticationFactory.create(this.getConfiguration().getBrokerClientAuthenticationPlugin(), this.getConfiguration().getBrokerClientAuthenticationParameters()));
                }
                this.client = this.createClientImpl(conf);
            }
            catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.client;
    }

    public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
        if (this.adminClient == null) {
            try {
                String adminApiUrl;
                ServiceConfiguration conf = this.getConfiguration();
                String string = adminApiUrl = conf.isBrokerClientTlsEnabled() ? this.webServiceAddressTls : this.webServiceAddress;
                if (adminApiUrl == null) {
                    throw new IllegalArgumentException("Web service address was not set properly , isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled() + ", webServiceAddressTls: " + this.webServiceAddressTls + ", webServiceAddress: " + this.webServiceAddress);
                }
                PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
                builder.loadConf(PropertiesUtils.filterAndMapProperties(this.config.getProperties(), "brokerClient_"));
                builder.authentication(conf.getBrokerClientAuthenticationPlugin(), conf.getBrokerClientAuthenticationParameters());
                if (conf.isBrokerClientTlsEnabled()) {
                    builder.tlsCiphers(this.config.getBrokerClientTlsCiphers()).tlsProtocols(this.config.getBrokerClientTlsProtocols());
                    if (conf.isBrokerClientTlsEnabledWithKeyStore()) {
                        builder.useKeyStoreTls(true).tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()).tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword());
                    } else {
                        builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
                    }
                    builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()).enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
                }
                builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.adminClient = builder.build();
                LOG.info("created admin with url {} ", (Object)adminApiUrl);
            }
            catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.adminClient;
    }

    public MetricsGenerator getMetricsGenerator() {
        return this.metricsGenerator;
    }

    public TransactionMetadataStoreService getTransactionMetadataStoreService() {
        return this.transactionMetadataStoreService;
    }

    public TransactionBufferProvider getTransactionBufferProvider() {
        return this.transactionBufferProvider;
    }

    public TransactionBufferClient getTransactionBufferClient() {
        return this.transactionBufferClient;
    }

    protected String brokerUrl(ServiceConfiguration config) {
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar");
        return internalListener.getBrokerServiceUrl() != null ? internalListener.getBrokerServiceUrl().toString() : null;
    }

    public static String brokerUrl(String host, int port) {
        return ServiceConfigurationUtils.brokerUrl(host, port);
    }

    public String brokerUrlTls(ServiceConfiguration config) {
        AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar+ssl");
        return internalListener.getBrokerServiceUrlTls() != null ? internalListener.getBrokerServiceUrlTls().toString() : null;
    }

    public static String brokerUrlTls(String host, int port) {
        return ServiceConfigurationUtils.brokerUrlTls(host, port);
    }

    public String webAddress(ServiceConfiguration config) {
        if (config.getWebServicePort().isPresent()) {
            AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
            return internalListener.getBrokerHttpUrl() != null ? internalListener.getBrokerHttpUrl().toString() : PulsarService.webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), this.getListenPortHTTP().get());
        }
        return null;
    }

    public static String webAddress(String host, int port) {
        return String.format("http://%s:%d", host, port);
    }

    public String webAddressTls(ServiceConfiguration config) {
        if (config.getWebServicePortTls().isPresent()) {
            AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
            return internalListener.getBrokerHttpsUrl() != null ? internalListener.getBrokerHttpsUrl().toString() : PulsarService.webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), this.getListenPortHTTPS().get());
        }
        return null;
    }

    public static String webAddressTls(String host, int port) {
        return String.format("https://%s:%d", host, port);
    }

    public String getSafeWebServiceAddress() {
        return this.webServiceAddress != null ? this.webServiceAddress : this.webServiceAddressTls;
    }

    @Deprecated
    public String getSafeBrokerServiceUrl() {
        return this.brokerServiceUrl != null ? this.brokerServiceUrl : this.brokerServiceUrlTls;
    }

    public static String bookieMetadataServiceUri(ServiceConfiguration config) {
        ClientConfiguration bkConf = new ClientConfiguration();
        String metadataServiceUri = null;
        try {
            String zkServers = config.getZookeeperServers();
            bkConf.setZkServers(zkServers);
            metadataServiceUri = bkConf.getMetadataServiceUri();
        }
        catch (ConfigurationException e) {
            LOG.error("Failed to get bookkeeper metadata service uri", (Throwable)e);
        }
        return metadataServiceUri;
    }

    public TopicPoliciesService getTopicPoliciesService() {
        return this.topicPoliciesService;
    }

    public ResourceUsageTransportManager getResourceUsageTransportManager() {
        return this.resourceUsageTransportManager;
    }

    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
        if (this.metricsServlet == null) {
            if (this.pendingMetricsProviders == null) {
                this.pendingMetricsProviders = new LinkedList<PrometheusRawMetricsProvider>();
            }
            this.pendingMetricsProviders.add(metricsProvider);
        } else {
            this.metricsServlet.addRawMetricsProvider(metricsProvider);
        }
    }

    private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws Exception {
        if (this.functionWorkerService.isPresent()) {
            if (this.workerConfig.isUseTls() || this.brokerServiceUrl == null) {
                this.workerConfig.setPulsarServiceUrl(this.brokerServiceUrlTls);
            } else {
                this.workerConfig.setPulsarServiceUrl(this.brokerServiceUrl);
            }
            if (this.workerConfig.isUseTls() || this.webServiceAddress == null) {
                this.workerConfig.setPulsarWebServiceUrl(this.webServiceAddressTls);
                this.workerConfig.setFunctionWebServiceUrl(this.webServiceAddressTls);
            } else {
                this.workerConfig.setPulsarWebServiceUrl(this.webServiceAddress);
                this.workerConfig.setFunctionWebServiceUrl(this.webServiceAddress);
            }
            LOG.info("Starting function worker service: serviceUrl = {}, webServiceUrl = {}, functionWebServiceUrl = {}", new Object[]{this.workerConfig.getPulsarServiceUrl(), this.workerConfig.getPulsarWebServiceUrl(), this.workerConfig.getFunctionWebServiceUrl()});
            this.functionWorkerService.get().initInBroker(this.config, this.workerConfig, this.pulsarResources, this.getInternalConfigurationData());
            this.functionWorkerService.get().start(authenticationService, authorizationService, ErrorNotifier.getShutdownServiceImpl(this));
            LOG.info("Function worker service started");
        }
    }

    public PackagesManagement getPackagesManagement() throws UnsupportedOperationException {
        return this.packagesManagement.orElseThrow(() -> new UnsupportedOperationException("Package Management Service is not enabled in the broker."));
    }

    private void startPackagesManagementService() throws IOException {
        PackagesManagementImpl packagesManagementService = new PackagesManagementImpl();
        this.packagesManagement = Optional.of(packagesManagementService);
        PackagesStorageProvider storageProvider = PackagesStorageProvider.newProvider(this.config.getPackagesManagementStorageProvider());
        DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration();
        storageConfiguration.setProperty(this.config.getProperties());
        PackagesStorage storage = storageProvider.getStorage(storageConfiguration);
        storage.initialize();
        packagesManagementService.initialize(storage);
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.webService.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.webService.getListenPortHTTPS();
    }

    public Optional<Integer> getBrokerListenPort() {
        return this.brokerService.getListenPort();
    }

    public Optional<Integer> getBrokerListenPortTls() {
        return this.brokerService.getListenPortTls();
    }

    public MetadataStoreExtended getLocalMetadataStore() {
        return this.localMetadataStore;
    }

    public CoordinationService getCoordinationService() {
        return this.coordinationService;
    }

    public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig, String workerConfigFile) throws IOException {
        WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile);
        brokerConfig.getWebServicePort().map(port -> workerConfig.setWorkerPort((Integer)port));
        brokerConfig.getWebServicePortTls().map(port -> workerConfig.setWorkerPortTls((Integer)port));
        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(brokerConfig.getAdvertisedAddress());
        workerConfig.setWorkerHostname(hostname);
        workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
        workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled());
        workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders());
        workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
        workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
        workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
        workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
        workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
        workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
        workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled());
        workerConfig.setTlsEnableHostnameVerification(brokerConfig.isTlsHostnameVerificationEnabled());
        workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath());
        workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
        workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
        workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
        workerConfig.setProxyRoles(brokerConfig.getProxyRoles());
        if (StringUtils.isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) {
            workerConfig.setFunctionsWorkerServiceNarPackage(brokerConfig.getFunctionsWorkerServiceNarPackage());
        }
        workerConfig.setWorkerId("c-" + brokerConfig.getClusterName() + "-fw-" + hostname + "-" + (workerConfig.getTlsEnabled() ? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()));
        return workerConfig;
    }

    @Override
    public void shutdownNow() {
        LOG.info("Invoking Pulsar service immediate shutdown");
        try {
            this.closeMetadataServiceSession();
        }
        catch (Exception e) {
            LOG.warn("Failed to close metadata service session: {}", (Object)e.getMessage());
        }
        this.processTerminator.accept(-1);
    }

    public static boolean isTransactionSystemTopic(TopicName topicName) {
        String topic = topicName.toString();
        return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) || topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith("__transaction_pending_ack");
    }

    public static boolean isTransactionInternalName(TopicName topicName) {
        String topic = topicName.toString();
        return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith("__transaction_pending_ack");
    }

    @VisibleForTesting
    protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
        return new BrokerService(pulsar, this.ioEventLoopGroup);
    }

    private ZooKeeper getZkClient() {
        if (this.localMetadataStore instanceof ZKMetadataStore) {
            return ((ZKMetadataStore)this.localMetadataStore).getZkClient();
        }
        throw new RuntimeException("MetadataStore implemenation is not based on ZooKeeper");
    }

    public ServiceConfiguration getConfig() {
        return this.config;
    }

    public NamespaceService getNsService() {
        return this.nsService;
    }

    public WebService getWebService() {
        return this.webService;
    }

    public WebSocketService getWebSocketService() {
        return this.webSocketService;
    }

    public BookKeeperClientFactory getBkClientFactory() {
        return this.bkClientFactory;
    }

    public ResourceGroupService getResourceGroupServiceManager() {
        return this.resourceGroupServiceManager;
    }

    public OrderedScheduler getOffloaderScheduler() {
        return this.offloaderScheduler;
    }

    public OffloadersCache getOffloadersCache() {
        return this.offloadersCache;
    }

    public LedgerOffloader getDefaultOffloader() {
        return this.defaultOffloader;
    }

    public Map<NamespaceName, LedgerOffloader> getLedgerOffloaderMap() {
        return this.ledgerOffloaderMap;
    }

    public ScheduledFuture<?> getLoadReportTask() {
        return this.loadReportTask;
    }

    public ScheduledFuture<?> getLoadSheddingTask() {
        return this.loadSheddingTask;
    }

    public ScheduledFuture<?> getLoadResourceQuotaTask() {
        return this.loadResourceQuotaTask;
    }

    public AtomicReference<LoadManager> getLoadManager() {
        return this.loadManager;
    }

    public ZooKeeperClientFactory getZkClientFactory() {
        return this.zkClientFactory;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    public String getAdvertisedAddress() {
        return this.advertisedAddress;
    }

    public String getWebServiceAddress() {
        return this.webServiceAddress;
    }

    public String getWebServiceAddressTls() {
        return this.webServiceAddressTls;
    }

    public String getBrokerServiceUrl() {
        return this.brokerServiceUrl;
    }

    public String getBrokerServiceUrlTls() {
        return this.brokerServiceUrlTls;
    }

    public String getBrokerVersion() {
        return this.brokerVersion;
    }

    public SchemaStorage getSchemaStorage() {
        return this.schemaStorage;
    }

    public SchemaRegistryService getSchemaRegistryService() {
        return this.schemaRegistryService;
    }

    public Optional<WorkerService> getFunctionWorkerService() {
        return this.functionWorkerService;
    }

    public ProtocolHandlers getProtocolHandlers() {
        return this.protocolHandlers;
    }

    public Consumer<Integer> getProcessTerminator() {
        return this.processTerminator;
    }

    public EventLoopGroup getIoEventLoopGroup() {
        return this.ioEventLoopGroup;
    }

    public ExecutorProvider getBrokerClientSharedInternalExecutorProvider() {
        return this.brokerClientSharedInternalExecutorProvider;
    }

    public ExecutorProvider getBrokerClientSharedExternalExecutorProvider() {
        return this.brokerClientSharedExternalExecutorProvider;
    }

    public ScheduledExecutorProvider getBrokerClientSharedScheduledExecutorProvider() {
        return this.brokerClientSharedScheduledExecutorProvider;
    }

    public Timer getBrokerClientSharedTimer() {
        return this.brokerClientSharedTimer;
    }

    public HashedWheelTimer getTransactionTimer() {
        return this.transactionTimer;
    }

    public BrokerInterceptor getBrokerInterceptor() {
        return this.brokerInterceptor;
    }

    public AdditionalServlets getBrokerAdditionalServlets() {
        return this.brokerAdditionalServlets;
    }

    public PrometheusMetricsServlet getMetricsServlet() {
        return this.metricsServlet;
    }

    public List<PrometheusRawMetricsProvider> getPendingMetricsProviders() {
        return this.pendingMetricsProviders;
    }

    public TransactionBufferSnapshotService getTransactionBufferSnapshotService() {
        return this.transactionBufferSnapshotService;
    }

    public MetadataStore getConfigurationMetadataStore() {
        return this.configurationMetadataStore;
    }

    public boolean isShouldShutdownConfigurationMetadataStore() {
        return this.shouldShutdownConfigurationMetadataStore;
    }

    public PulsarResources getPulsarResources() {
        return this.pulsarResources;
    }

    public TransactionPendingAckStoreProvider getTransactionPendingAckStoreProvider() {
        return this.transactionPendingAckStoreProvider;
    }

    public ReentrantLock getMutex() {
        return this.mutex;
    }

    public Condition getIsClosedCondition() {
        return this.isClosedCondition;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    public Map<String, AdvertisedListener> getAdvertisedListeners() {
        return this.advertisedListeners;
    }

    public NamespaceName getHeartbeatNamespaceV2() {
        return this.heartbeatNamespaceV2;
    }

    public NamespaceName getHeartbeatNamespaceV1() {
        return this.heartbeatNamespaceV1;
    }

    protected void setConfig(ServiceConfiguration config) {
        this.config = config;
    }

    protected void setNsService(NamespaceService nsService) {
        this.nsService = nsService;
    }

    protected void setManagedLedgerClientFactory(ManagedLedgerStorage managedLedgerClientFactory) {
        this.managedLedgerClientFactory = managedLedgerClientFactory;
    }

    protected void setLeaderElectionService(LeaderElectionService leaderElectionService) {
        this.leaderElectionService = leaderElectionService;
    }

    protected void setBrokerService(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    protected void setWebService(WebService webService) {
        this.webService = webService;
    }

    protected void setWebSocketService(WebSocketService webSocketService) {
        this.webSocketService = webSocketService;
    }

    protected void setTopicPoliciesService(TopicPoliciesService topicPoliciesService) {
        this.topicPoliciesService = topicPoliciesService;
    }

    protected void setBkClientFactory(BookKeeperClientFactory bkClientFactory) {
        this.bkClientFactory = bkClientFactory;
    }

    protected void setCompactor(Compactor compactor) {
        this.compactor = compactor;
    }

    protected void setResourceUsageTransportManager(ResourceUsageTransportManager resourceUsageTransportManager) {
        this.resourceUsageTransportManager = resourceUsageTransportManager;
    }

    protected void setResourceGroupServiceManager(ResourceGroupService resourceGroupServiceManager) {
        this.resourceGroupServiceManager = resourceGroupServiceManager;
    }

    protected void setOrderedExecutor(OrderedExecutor orderedExecutor) {
        this.orderedExecutor = orderedExecutor;
    }

    protected void setCompactorExecutor(ScheduledExecutorService compactorExecutor) {
        this.compactorExecutor = compactorExecutor;
    }

    protected void setOffloaderScheduler(OrderedScheduler offloaderScheduler) {
        this.offloaderScheduler = offloaderScheduler;
    }

    protected void setOffloadersCache(OffloadersCache offloadersCache) {
        this.offloadersCache = offloadersCache;
    }

    protected void setDefaultOffloader(LedgerOffloader defaultOffloader) {
        this.defaultOffloader = defaultOffloader;
    }

    protected void setLedgerOffloaderMap(Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap) {
        this.ledgerOffloaderMap = ledgerOffloaderMap;
    }

    protected void setLoadReportTask(ScheduledFuture<?> loadReportTask) {
        this.loadReportTask = loadReportTask;
    }

    protected void setLoadSheddingTask(ScheduledFuture<?> loadSheddingTask) {
        this.loadSheddingTask = loadSheddingTask;
    }

    protected void setLoadResourceQuotaTask(ScheduledFuture<?> loadResourceQuotaTask) {
        this.loadResourceQuotaTask = loadResourceQuotaTask;
    }

    protected void setAdminClient(PulsarAdmin adminClient) {
        this.adminClient = adminClient;
    }

    protected void setClient(PulsarClient client) {
        this.client = client;
    }

    protected void setZkClientFactory(ZooKeeperClientFactory zkClientFactory) {
        this.zkClientFactory = zkClientFactory;
    }

    protected void setWebServiceAddress(String webServiceAddress) {
        this.webServiceAddress = webServiceAddress;
    }

    protected void setWebServiceAddressTls(String webServiceAddressTls) {
        this.webServiceAddressTls = webServiceAddressTls;
    }

    protected void setBrokerServiceUrl(String brokerServiceUrl) {
        this.brokerServiceUrl = brokerServiceUrl;
    }

    protected void setBrokerServiceUrlTls(String brokerServiceUrlTls) {
        this.brokerServiceUrlTls = brokerServiceUrlTls;
    }

    protected void setSchemaStorage(SchemaStorage schemaStorage) {
        this.schemaStorage = schemaStorage;
    }

    protected void setSchemaRegistryService(SchemaRegistryService schemaRegistryService) {
        this.schemaRegistryService = schemaRegistryService;
    }

    protected void setProtocolHandlers(ProtocolHandlers protocolHandlers) {
        this.protocolHandlers = protocolHandlers;
    }

    protected void setMetricsGenerator(MetricsGenerator metricsGenerator) {
        this.metricsGenerator = metricsGenerator;
    }

    protected void setTransactionMetadataStoreService(TransactionMetadataStoreService transactionMetadataStoreService) {
        this.transactionMetadataStoreService = transactionMetadataStoreService;
    }

    protected void setTransactionBufferProvider(TransactionBufferProvider transactionBufferProvider) {
        this.transactionBufferProvider = transactionBufferProvider;
    }

    protected void setTransactionBufferClient(TransactionBufferClient transactionBufferClient) {
        this.transactionBufferClient = transactionBufferClient;
    }

    protected void setTransactionTimer(HashedWheelTimer transactionTimer) {
        this.transactionTimer = transactionTimer;
    }

    protected void setBrokerInterceptor(BrokerInterceptor brokerInterceptor) {
        this.brokerInterceptor = brokerInterceptor;
    }

    protected void setBrokerAdditionalServlets(AdditionalServlets brokerAdditionalServlets) {
        this.brokerAdditionalServlets = brokerAdditionalServlets;
    }

    protected void setPackagesManagement(Optional<PackagesManagement> packagesManagement) {
        this.packagesManagement = packagesManagement;
    }

    protected void setMetricsServlet(PrometheusMetricsServlet metricsServlet) {
        this.metricsServlet = metricsServlet;
    }

    protected void setPendingMetricsProviders(List<PrometheusRawMetricsProvider> pendingMetricsProviders) {
        this.pendingMetricsProviders = pendingMetricsProviders;
    }

    protected void setLocalMetadataStore(MetadataStoreExtended localMetadataStore) {
        this.localMetadataStore = localMetadataStore;
    }

    protected void setCoordinationService(CoordinationService coordinationService) {
        this.coordinationService = coordinationService;
    }

    protected void setTransactionBufferSnapshotService(TransactionBufferSnapshotService transactionBufferSnapshotService) {
        this.transactionBufferSnapshotService = transactionBufferSnapshotService;
    }

    protected void setConfigurationMetadataStore(MetadataStore configurationMetadataStore) {
        this.configurationMetadataStore = configurationMetadataStore;
    }

    protected void setShouldShutdownConfigurationMetadataStore(boolean shouldShutdownConfigurationMetadataStore) {
        this.shouldShutdownConfigurationMetadataStore = shouldShutdownConfigurationMetadataStore;
    }

    protected void setPulsarResources(PulsarResources pulsarResources) {
        this.pulsarResources = pulsarResources;
    }

    protected void setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider transactionPendingAckStoreProvider) {
        this.transactionPendingAckStoreProvider = transactionPendingAckStoreProvider;
    }

    protected void setState(State state) {
        this.state = state;
    }

    protected void setCloseFuture(CompletableFuture<Void> closeFuture) {
        this.closeFuture = closeFuture;
    }

    protected void setAdvertisedListeners(Map<String, AdvertisedListener> advertisedListeners) {
        this.advertisedListeners = advertisedListeners;
    }

    protected void setHeartbeatNamespaceV2(NamespaceName heartbeatNamespaceV2) {
        this.heartbeatNamespaceV2 = heartbeatNamespaceV2;
    }

    protected void setHeartbeatNamespaceV1(NamespaceName heartbeatNamespaceV1) {
        this.heartbeatNamespaceV1 = heartbeatNamespaceV1;
    }

    public static enum State {
        Init,
        Started,
        Closing,
        Closed;

    }
}

