package org.apache.pulsar.broker;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableMap;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.http.HttpRouter;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.shade.org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.shade.org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.pulsar.shade.org.eclipse.jetty.servlet.ServletHolder;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.admin.WebSocketWebResource;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/PulsarService.class */
public class PulsarService implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
    private ServiceConfiguration config;
    private NamespaceService nsService;
    private ManagedLedgerClientFactory managedLedgerClientFactory;
    private LeaderElectionService leaderElectionService;
    private BrokerService brokerService;
    private WebService webService;
    private WebSocketService webSocketService;
    private ConfigurationCacheService configurationCacheService;
    private LocalZooKeeperCacheService localZkCacheService;
    private TopicPoliciesService topicPoliciesService;
    private BookKeeperClientFactory bkClientFactory;
    private ZooKeeperCache localZkCache;
    private GlobalZooKeeperCache globalZkCache;
    private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
    private Compactor compactor;
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService cacheExecutor;
    private OrderedExecutor orderedExecutor;
    private final ScheduledExecutorService loadManagerExecutor;
    private ScheduledExecutorService compactorExecutor;
    private OrderedScheduler offloaderScheduler;
    private OffloadersCache offloadersCache;
    private LedgerOffloader defaultOffloader;
    private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap;
    private ScheduledFuture<?> loadReportTask;
    private ScheduledFuture<?> loadSheddingTask;
    private ScheduledFuture<?> loadResourceQuotaTask;
    private final AtomicReference<LoadManager> loadManager;
    private PulsarAdmin adminClient;
    private PulsarClient client;
    private ZooKeeperClientFactory zkClientFactory;
    private final String bindAddress;
    private final String advertisedAddress;
    private String webServiceAddress;
    private String webServiceAddressTls;
    private String brokerServiceUrl;
    private String brokerServiceUrlTls;
    private final String brokerVersion;
    private SchemaStorage schemaStorage;
    private SchemaRegistryService schemaRegistryService;
    private final Optional<WorkerService> functionWorkerService;
    private ProtocolHandlers protocolHandlers;
    private final ZooKeeperSessionWatcher.ShutdownService shutdownService;
    private MetricsGenerator metricsGenerator;
    private TransactionMetadataStoreService transactionMetadataStoreService;
    private TransactionBufferProvider transactionBufferProvider;
    private TransactionBufferClient transactionBufferClient;
    private BrokerInterceptor brokerInterceptor;
    private PrometheusMetricsServlet metricsServlet;
    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
    private volatile State state;
    private final ReentrantLock mutex;
    private final Condition isClosedCondition;
    private Map<String, AdvertisedListener> advertisedListeners;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/PulsarService$DeleteClusterListener.class */
    public class DeleteClusterListener implements ZooKeeperCacheListener {
        DeleteClusterListener() {
        }

        @Override // org.apache.pulsar.zookeeper.ZooKeeperCacheListener
        public void onUpdate(String str, Object obj, Stat stat) {
        }

        @Override // org.apache.pulsar.zookeeper.ZooKeeperCacheListener
        public void onDelete(String str) {
            if (str.startsWith(ConfigurationCacheService.CLUSTERS_ROOT)) {
                PulsarService.this.getBrokerService().closeAndRemoveReplicationClient(str.substring(ConfigurationCacheService.CLUSTERS_ROOT.length() + 1));
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/PulsarService$State.class */
    public enum State {
        Init,
        Started,
        Closed
    }

    public PulsarService(ServiceConfiguration serviceConfiguration) {
        this(serviceConfiguration, Optional.empty(), num -> {
            LOG.info("Process termination requested with code {}. Ignoring, as this constructor is intended for tests. ", num);
        });
    }

    public PulsarService(ServiceConfiguration serviceConfiguration, Optional<WorkerService> optional, Consumer<Integer> consumer) {
        this.config = null;
        this.nsService = null;
        this.managedLedgerClientFactory = null;
        this.leaderElectionService = null;
        this.brokerService = null;
        this.webService = null;
        this.webSocketService = null;
        this.configurationCacheService = null;
        this.localZkCacheService = null;
        this.topicPoliciesService = TopicPoliciesService.DISABLED;
        this.offloadersCache = new OffloadersCache();
        this.ledgerOffloaderMap = new ConcurrentHashMap();
        this.loadReportTask = null;
        this.loadSheddingTask = null;
        this.loadResourceQuotaTask = null;
        this.loadManager = new AtomicReference<>();
        this.adminClient = null;
        this.client = null;
        this.zkClientFactory = null;
        this.schemaStorage = null;
        this.schemaRegistryService = null;
        this.protocolHandlers = null;
        this.mutex = new ReentrantLock();
        this.isClosedCondition = this.mutex.newCondition();
        PulsarConfigurationLoader.isComplete(serviceConfiguration);
        this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(serviceConfiguration);
        this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(serviceConfiguration);
        this.state = State.Init;
        this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getBindAddress());
        this.brokerVersion = PulsarVersion.getVersion();
        this.config = serviceConfiguration;
        this.shutdownService = new MessagingServiceShutdownHook(this, consumer);
        this.loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
        this.functionWorkerService = optional;
        this.executor = Executors.newScheduledThreadPool(serviceConfiguration.getNumExecutorThreadPoolSize(), new DefaultThreadFactory("pulsar"));
        this.cacheExecutor = Executors.newScheduledThreadPool(serviceConfiguration.getNumCacheExecutorThreadPoolSize(), new DefaultThreadFactory("zk-cache-callback"));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws PulsarServerException {
        this.mutex.lock();
        try {
            try {
                if (this.state == State.Closed) {
                    return;
                }
                if (this.webService != null) {
                    try {
                        this.webService.close();
                        this.webService = null;
                    } catch (Exception e) {
                        LOG.error("Web service closing failed", e);
                    }
                }
                if (this.webSocketService != null) {
                    this.webSocketService.close();
                }
                if (this.brokerService != null) {
                    this.brokerService.close();
                    this.brokerService = null;
                }
                if (this.managedLedgerClientFactory != null) {
                    this.managedLedgerClientFactory.close();
                    this.managedLedgerClientFactory = null;
                }
                if (this.bkClientFactory != null) {
                    this.bkClientFactory.close();
                    this.bkClientFactory = null;
                }
                if (this.leaderElectionService != null) {
                    this.leaderElectionService.stop();
                    this.leaderElectionService = null;
                }
                this.loadManagerExecutor.shutdown();
                if (this.globalZkCache != null) {
                    this.globalZkCache.close();
                    this.globalZkCache = null;
                    this.localZooKeeperConnectionProvider.close();
                    this.localZooKeeperConnectionProvider = null;
                }
                this.configurationCacheService = null;
                this.localZkCacheService = null;
                if (this.localZkCache != null) {
                    this.localZkCache.stop();
                    this.localZkCache = null;
                }
                if (this.adminClient != null) {
                    this.adminClient.close();
                    this.adminClient = null;
                }
                if (this.client != null) {
                    this.client.close();
                    this.client = null;
                }
                this.nsService = null;
                if (this.compactorExecutor != null) {
                    this.compactorExecutor.shutdown();
                }
                if (this.offloaderScheduler != null) {
                    this.offloaderScheduler.shutdown();
                }
                if (this.executor != null) {
                    this.executor.shutdown();
                }
                if (this.orderedExecutor != null) {
                    this.orderedExecutor.shutdown();
                }
                this.cacheExecutor.shutdown();
                LoadManager loadManager = this.loadManager.get();
                if (loadManager != null) {
                    loadManager.stop();
                }
                if (this.schemaRegistryService != null) {
                    this.schemaRegistryService.close();
                }
                this.offloadersCache.close();
                if (this.protocolHandlers != null) {
                    this.protocolHandlers.close();
                    this.protocolHandlers = null;
                }
                if (this.transactionBufferClient != null) {
                    this.transactionBufferClient.close();
                }
                this.state = State.Closed;
                this.isClosedCondition.signalAll();
                this.mutex.unlock();
            } catch (Exception e2) {
                throw new PulsarServerException(e2);
            }
        } finally {
            this.mutex.unlock();
        }
    }

    public ServiceConfiguration getConfiguration() {
        return this.config;
    }

    public Optional<WorkerConfig> getWorkerConfig() {
        return this.functionWorkerService.map(workerService -> {
            return workerService.getWorkerConfig();
        });
    }

    public Map<String, String> getProtocolDataToAdvertise() {
        return null == this.protocolHandlers ? Collections.emptyMap() : this.protocolHandlers.getProtocolDataToAdvertise();
    }

    public void start() throws PulsarServerException {
        ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired;
        this.mutex.lock();
        LOG.info("Starting Pulsar Broker service; version: '{}'", this.brokerVersion != null ? this.brokerVersion : QuorumStats.Provider.UNKNOWN_STATE);
        LOG.info("Git Revision {}", PulsarVersion.getGitSha());
        LOG.info("Built by {} on {} at {}", new Object[]{PulsarVersion.getBuildUser(), PulsarVersion.getBuildHost(), PulsarVersion.getBuildTime()});
        try {
            try {
                if (this.state != State.Init) {
                    throw new PulsarServerException("Cannot start the service once it was stopped");
                }
                if (!this.config.getWebServicePort().isPresent() && !this.config.getWebServicePortTls().isPresent()) {
                    throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
                }
                if (!this.config.getBrokerServicePort().isPresent() && !this.config.getBrokerServicePortTls().isPresent()) {
                    throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
                }
                this.orderedExecutor = OrderedExecutor.newBuilder().numThreads(this.config.getNumOrderedExecutorThreads()).name("pulsar-ordered").build();
                this.protocolHandlers = ProtocolHandlers.load(this.config);
                this.protocolHandlers.initialize(this.config);
                this.localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), this.config.getZookeeperServers(), this.config.getZooKeeperSessionTimeoutMillis());
                if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(this.config.getZookeeperSessionExpiredPolicy())) {
                    shutdownWhenZookeeperSessionExpired = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, this.shutdownService);
                } else {
                    if (!ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(this.config.getZookeeperSessionExpiredPolicy())) {
                        throw new IllegalArgumentException("Invalid zookeeper session expired policy " + this.config.getZookeeperSessionExpiredPolicy());
                    }
                    shutdownWhenZookeeperSessionExpired = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(this.shutdownService);
                }
                this.localZooKeeperConnectionProvider.start(shutdownWhenZookeeperSessionExpired);
                startZkCacheService();
                this.bkClientFactory = newBookKeeperClientFactory();
                this.managedLedgerClientFactory = new ManagedLedgerClientFactory(this.config, getZkClient(), this.bkClientFactory);
                this.brokerService = new BrokerService(this);
                this.loadManager.set(LoadManager.create(this));
                startNamespaceService();
                this.schemaStorage = createAndStartSchemaStorage();
                this.schemaRegistryService = SchemaRegistryService.create(this.schemaStorage, this.config.getSchemaRegistryCompatibilityCheckers());
                this.defaultOffloader = createManagedLedgerOffloader(OffloadPolicies.create(getConfiguration().getProperties()));
                this.brokerInterceptor = BrokerInterceptors.load(this.config);
                this.brokerService.setInterceptor(getBrokerInterceptor());
                this.brokerInterceptor.initialize(this);
                this.brokerService.start();
                this.webService = new WebService(this);
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put("pulsar", this);
                HashMap newHashMap2 = Maps.newHashMap();
                newHashMap2.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath());
                newHashMap2.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() { // from class: org.apache.pulsar.broker.PulsarService.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.function.Supplier
                    public Boolean get() {
                        return Boolean.valueOf(PulsarService.this.state == State.Started);
                    }
                });
                this.webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, newHashMap2);
                this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, newHashMap);
                this.webService.addRestResources(WebSocketWebResource.ADMIN_PATH_V1, "org.apache.pulsar.broker.admin.v1", true, newHashMap);
                this.webService.addRestResources(WebSocketWebResource.ADMIN_PATH_V2, "org.apache.pulsar.broker.admin.v2", true, newHashMap);
                this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, newHashMap);
                this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, newHashMap);
                this.metricsServlet = new PrometheusMetricsServlet(this, this.config.isExposeTopicLevelMetricsInPrometheus(), this.config.isExposeConsumerLevelMetricsInPrometheus());
                if (this.pendingMetricsProviders != null) {
                    this.pendingMetricsProviders.forEach(prometheusRawMetricsProvider -> {
                        this.metricsServlet.addRawMetricsProvider(prometheusRawMetricsProvider);
                    });
                    this.pendingMetricsProviders = null;
                }
                this.webService.addServlet(HttpRouter.METRICS, new ServletHolder(this.metricsServlet), false, newHashMap);
                if (this.config.isWebSocketServiceEnabled()) {
                    this.webSocketService = new WebSocketService(null, this.config);
                    this.webSocketService.start();
                    WebSocketProducerServlet webSocketProducerServlet = new WebSocketProducerServlet(this.webSocketService);
                    this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH, new ServletHolder(webSocketProducerServlet), true, newHashMap);
                    this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new ServletHolder(webSocketProducerServlet), true, newHashMap);
                    WebSocketConsumerServlet webSocketConsumerServlet = new WebSocketConsumerServlet(this.webSocketService);
                    this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH, new ServletHolder(webSocketConsumerServlet), true, newHashMap);
                    this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new ServletHolder(webSocketConsumerServlet), true, newHashMap);
                    WebSocketReaderServlet webSocketReaderServlet = new WebSocketReaderServlet(this.webSocketService);
                    this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH, new ServletHolder(webSocketReaderServlet), true, newHashMap);
                    this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(webSocketReaderServlet), true, newHashMap);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Attempting to add static directory");
                }
                this.webService.addStaticResources("/static", "/static");
                this.webService.start();
                this.webServiceAddress = webAddress(this.config);
                this.webServiceAddressTls = webAddressTls(this.config);
                this.brokerServiceUrl = brokerUrl(this.config);
                this.brokerServiceUrlTls = brokerUrlTls(this.config);
                if (null != this.webSocketService) {
                    this.webSocketService.setLocalCluster(new ClusterData(this.webServiceAddress, this.webServiceAddressTls, this.brokerServiceUrl, this.brokerServiceUrlTls));
                }
                this.nsService.initialize();
                if (this.config.isTopicLevelPoliciesEnabled() && this.config.isSystemTopicEnabled()) {
                    this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);
                }
                this.topicPoliciesService.start();
                startLeaderElectionService();
                this.nsService.registerBootstrapNamespaces();
                if (this.config.isTransactionCoordinatorEnabled()) {
                    this.transactionBufferClient = TransactionBufferClientImpl.create(getNamespaceService(), ((PulsarClientImpl) getClient()).getCnxPool());
                    this.transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider.newProvider(this.config.getTransactionMetadataStoreProviderClassName()), this, this.transactionBufferClient);
                    this.transactionMetadataStoreService.start();
                    this.transactionBufferProvider = TransactionBufferProvider.newProvider(this.config.getTransactionBufferProviderClassName());
                }
                this.metricsGenerator = new MetricsGenerator(this);
                startLoadManagementService();
                this.protocolHandlers.start(this.brokerService);
                this.brokerService.startProtocolHandlers(this.protocolHandlers.newChannelInitializers());
                acquireSLANamespace();
                startWorkerService(this.brokerService.getAuthenticationService(), this.brokerService.getAuthorizationService());
                String str = "bootstrap service " + (this.config.getWebServicePort().isPresent() ? "port = " + this.config.getWebServicePort().get() : "") + (this.config.getWebServicePortTls().isPresent() ? ", tls-port = " + this.config.getWebServicePortTls() : "") + (this.config.getBrokerServicePort().isPresent() ? ", broker url= " + this.brokerServiceUrl : "") + (this.config.getBrokerServicePortTls().isPresent() ? ", broker tls url= " + this.brokerServiceUrlTls : "");
                LOG.info("messaging service is ready");
                LOG.info("messaging service is ready, {}, cluster={}, configs={}", new Object[]{str, this.config.getClusterName(), ReflectionToStringBuilder.toString(this.config)});
                this.state = State.Started;
                this.mutex.unlock();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                throw new PulsarServerException(e);
            }
        } catch (Throwable th) {
            this.mutex.unlock();
            throw th;
        }
    }

    protected void startLeaderElectionService() {
        this.leaderElectionService = new LeaderElectionService(this, new LeaderElectionService.LeaderListener() { // from class: org.apache.pulsar.broker.PulsarService.2
            @Override // org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener
            public synchronized void brokerIsTheLeaderNow() {
                if (PulsarService.this.getConfiguration().isLoadBalancerEnabled()) {
                    long millis = TimeUnit.MINUTES.toMillis(PulsarService.this.getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                    long millis2 = TimeUnit.MINUTES.toMillis(PulsarService.this.getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
                    PulsarService.this.loadSheddingTask = PulsarService.this.loadManagerExecutor.scheduleAtFixedRate(new LoadSheddingTask(PulsarService.this.loadManager), millis, millis, TimeUnit.MILLISECONDS);
                    PulsarService.this.loadResourceQuotaTask = PulsarService.this.loadManagerExecutor.scheduleAtFixedRate(new LoadResourceQuotaUpdaterTask(PulsarService.this.loadManager), millis2, millis2, TimeUnit.MILLISECONDS);
                }
            }

            @Override // org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener
            public synchronized void brokerIsAFollowerNow() {
                if (PulsarService.this.loadSheddingTask != null) {
                    PulsarService.this.loadSheddingTask.cancel(false);
                    PulsarService.this.loadSheddingTask = null;
                }
                if (PulsarService.this.loadResourceQuotaTask != null) {
                    PulsarService.this.loadResourceQuotaTask.cancel(false);
                    PulsarService.this.loadResourceQuotaTask = null;
                }
            }
        });
        this.leaderElectionService.start();
    }

    protected void acquireSLANamespace() {
        boolean z;
        try {
            String sLAMonitorNamespace = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), this.config);
            if (!this.globalZkCache.exists(AdminResource.path("policies") + "/" + sLAMonitorNamespace)) {
                LOG.info("SLA Namespace = {} doesn't exist.", sLAMonitorNamespace);
                return;
            }
            try {
                z = this.nsService.registerSLANamespace();
                LOG.info("Register SLA Namespace = {}, returned - {}.", sLAMonitorNamespace, Boolean.valueOf(z));
            } catch (PulsarServerException e) {
                z = false;
            }
            if (!z) {
                this.nsService.unloadSLANamespace();
            }
        } catch (Exception e2) {
            LOG.warn("Exception while trying to unload the SLA namespace, will try to unload the namespace again after 1 minute. Exception:", e2);
            this.executor.schedule(this::acquireSLANamespace, 1L, TimeUnit.MINUTES);
        } catch (Throwable th) {
            LOG.warn("Exception while trying to unload the SLA namespace, will not try to unload the namespace again. Exception:", th);
        }
    }

    public void waitUntilClosed() throws InterruptedException {
        this.mutex.lock();
        while (this.state != State.Closed) {
            try {
                this.isClosedCondition.await();
            } finally {
                this.mutex.unlock();
            }
        }
    }

    protected void startZkCacheService() throws PulsarServerException {
        LOG.info("starting configuration cache service");
        this.localZkCache = new LocalZooKeeperCache(getZkClient(), this.config.getZooKeeperOperationTimeoutSeconds(), getOrderedExecutor());
        this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) this.config.getZooKeeperSessionTimeoutMillis(), this.config.getZooKeeperOperationTimeoutSeconds(), this.config.getConfigurationStoreServers(), getOrderedExecutor(), this.cacheExecutor, this.config.getZooKeeperCacheExpirySeconds());
        try {
            this.globalZkCache.start();
            this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.config.getClusterName());
            this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
            this.configurationCacheService.clustersCache().registerListener(new DeleteClusterListener());
        } catch (IOException e) {
            throw new PulsarServerException(e);
        }
    }

    protected void startNamespaceService() throws PulsarServerException {
        LOG.info("Starting name space service, bootstrap namespaces=" + this.config.getBootstrapNamespaces());
        this.nsService = getNamespaceServiceProvider().get();
    }

    public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
        return () -> {
            return new NamespaceService(this);
        };
    }

    protected void startLoadManagementService() throws PulsarServerException {
        LOG.info("Starting load management service ...");
        this.loadManager.get().start();
        if (this.config.isLoadBalancerEnabled()) {
            LOG.info("Starting load balancer");
            if (this.loadReportTask == null) {
                long j = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
                this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(new LoadReportUpdaterTask(this.loadManager), j, j, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void loadNamespaceTopics(NamespaceBundle namespaceBundle) {
        this.executor.submit(() -> {
            CompletableFuture<Topic> orCreateTopic;
            LOG.info("Loading all topics on bundle: {}", namespaceBundle);
            NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
            ArrayList newArrayList = Lists.newArrayList();
            long nanoTime = System.nanoTime();
            for (String str : getNamespaceService().getListOfPersistentTopics(namespaceObject).join()) {
                try {
                    if (namespaceBundle.includes(TopicName.get(str)) && (orCreateTopic = this.brokerService.getOrCreateTopic(str)) != null) {
                        newArrayList.add(orCreateTopic);
                    }
                } catch (Throwable th) {
                    LOG.warn("Failed to preload topic {}", str, th);
                }
            }
            if (newArrayList.isEmpty()) {
                return null;
            }
            FutureUtil.waitForAll(newArrayList).thenRun(() -> {
                LOG.info("Loaded {} topics on {} -- time taken: {} seconds", new Object[]{Integer.valueOf(newArrayList.size()), namespaceBundle, Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) / 1000.0d)});
            });
            return null;
        });
    }

    public String getStatusFilePath() {
        if (this.config == null) {
            return null;
        }
        return this.config.getStatusFilePath();
    }

    public ZooKeeper getZkClient() {
        return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
    }

    public String getMetadataServiceUri() {
        return bookieMetadataServiceUri(getConfiguration());
    }

    public InternalConfigurationData getInternalConfigurationData() {
        String metadataServiceUri = getMetadataServiceUri();
        if (StringUtils.isNotBlank(this.config.getBookkeeperMetadataServiceUri())) {
            metadataServiceUri = getConfiguration().getBookkeeperMetadataServiceUri();
        }
        return new InternalConfigurationData(getConfiguration().getZookeeperServers(), getConfiguration().getConfigurationStoreServers(), new ClientConfiguration().getZkLedgersRootPath(), metadataServiceUri, (String) getWorkerConfig().map(workerConfig -> {
            return workerConfig.getStateStorageServiceUrl();
        }).orElse(null));
    }

    public ConfigurationCacheService getConfigurationCache() {
        return this.configurationCacheService;
    }

    public State getState() {
        return this.state;
    }

    public LeaderElectionService getLeaderElectionService() {
        return this.leaderElectionService;
    }

    public NamespaceService getNamespaceService() {
        return this.nsService;
    }

    public WorkerService getWorkerService() throws UnsupportedOperationException {
        return this.functionWorkerService.orElseThrow(() -> {
            return new UnsupportedOperationException("Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false");
        });
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    public BookKeeper getBookKeeperClient() {
        return this.managedLedgerClientFactory.getBookKeeperClient();
    }

    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerClientFactory.getManagedLedgerFactory();
    }

    public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
        return this.managedLedgerClientFactory;
    }

    public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPolicies offloadPolicies) {
        return offloadPolicies == null ? getDefaultOffloader() : this.ledgerOffloaderMap.compute(namespaceName, (namespaceName2, ledgerOffloader) -> {
            if (ledgerOffloader != null) {
                try {
                    if (Objects.equals(ledgerOffloader.getOffloadPolicies(), offloadPolicies)) {
                        return ledgerOffloader;
                    }
                } catch (PulsarServerException e) {
                    LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e);
                    return new NullLedgerOffloader();
                }
            }
            if (ledgerOffloader != null) {
                ledgerOffloader.close();
            }
            return createManagedLedgerOffloader(offloadPolicies);
        });
    }

    public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies offloadPolicies) throws PulsarServerException {
        try {
            if (!StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
                LOG.info("No ledger offloader configured, using NULL instance");
                return NullLedgerOffloader.INSTANCE;
            }
            Preconditions.checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver());
            try {
                return this.offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(), this.config.getNarExtractionDirectory()).getOffloaderFactory(offloadPolicies.getManagedLedgerOffloadDriver()).create(offloadPolicies, ImmutableMap.of(LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()), this.schemaStorage, getOffloaderScheduler(offloadPolicies));
            } catch (IOException e) {
                throw new PulsarServerException(e.getMessage(), e.getCause());
            }
        } catch (Throwable th) {
            throw new PulsarServerException(th);
        }
    }

    private SchemaStorage createAndStartSchemaStorage() throws Exception {
        Class<?> cls = Class.forName(this.config.getSchemaRegistryStorageClassName());
        SchemaStorage schemaStorage = (SchemaStorage) cls.getMethod("create", PulsarService.class).invoke(cls.newInstance(), this);
        schemaStorage.start();
        return schemaStorage;
    }

    public ZooKeeperCache getLocalZkCache() {
        return this.localZkCache;
    }

    public ZooKeeperCache getGlobalZkCache() {
        return this.globalZkCache;
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getCacheExecutor() {
        return this.cacheExecutor;
    }

    public ScheduledExecutorService getLoadManagerExecutor() {
        return this.loadManagerExecutor;
    }

    public OrderedExecutor getOrderedExecutor() {
        return this.orderedExecutor;
    }

    public LocalZooKeeperCacheService getLocalZkCacheService() {
        return this.localZkCacheService;
    }

    public ZooKeeperClientFactory getZooKeeperClientFactory() {
        if (this.zkClientFactory == null) {
            this.zkClientFactory = new ZookeeperBkClientFactoryImpl(this.orderedExecutor);
        }
        return this.zkClientFactory;
    }

    public BookKeeperClientFactory newBookKeeperClientFactory() {
        return new BookKeeperClientFactoryImpl();
    }

    public BookKeeperClientFactory getBookKeeperClientFactory() {
        return this.bkClientFactory;
    }

    protected synchronized ScheduledExecutorService getCompactorExecutor() {
        if (this.compactorExecutor == null) {
            this.compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
        }
        return this.compactorExecutor;
    }

    public Compactor newCompactor() throws PulsarServerException {
        return new TwoPhaseCompactor(getConfiguration(), getClient(), getBookKeeperClient(), getCompactorExecutor());
    }

    public synchronized Compactor getCompactor() throws PulsarServerException {
        if (this.compactor == null) {
            this.compactor = newCompactor();
        }
        return this.compactor;
    }

    protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder().numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads().intValue()).name("offloader").build();
        }
        return this.offloaderScheduler;
    }

    public synchronized PulsarClient getClient() throws PulsarServerException {
        if (this.client == null) {
            try {
                ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(getConfiguration().isBrokerClientTlsEnabled() ? this.brokerServiceUrlTls : this.brokerServiceUrl);
                serviceUrl.loadConf(PropertiesUtils.filterAndMapProperties(getConfiguration().getProperties(), "brokerClient_"));
                if (getConfiguration().isBrokerClientTlsEnabled()) {
                    serviceUrl.allowTlsInsecureConnection(getConfiguration().isTlsAllowInsecureConnection());
                    if (getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
                        serviceUrl.useKeyStoreTls(true).tlsTrustStoreType(getConfiguration().getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(getConfiguration().getBrokerClientTlsTrustStore()).tlsTrustStorePassword(getConfiguration().getBrokerClientTlsTrustStorePassword());
                    } else {
                        serviceUrl.tlsTrustCertsFilePath(StringUtils.isNotBlank(getConfiguration().getBrokerClientTrustCertsFilePath()) ? getConfiguration().getBrokerClientTrustCertsFilePath() : getConfiguration().getTlsCertificateFilePath());
                    }
                }
                if (StringUtils.isNotBlank(getConfiguration().getBrokerClientAuthenticationPlugin())) {
                    serviceUrl.authentication(getConfiguration().getBrokerClientAuthenticationPlugin(), getConfiguration().getBrokerClientAuthenticationParameters());
                }
                this.client = serviceUrl.build();
            } catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.client;
    }

    public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
        if (this.adminClient == null) {
            try {
                ServiceConfiguration configuration = getConfiguration();
                String str = configuration.isBrokerClientTlsEnabled() ? this.webServiceAddressTls : this.webServiceAddress;
                if (str == null) {
                    throw new IllegalArgumentException("Web service address was not set properly , isBrokerClientTlsEnabled: " + configuration.isBrokerClientTlsEnabled() + ", webServiceAddressTls: " + this.webServiceAddressTls + ", webServiceAddress: " + this.webServiceAddress);
                }
                PulsarAdminBuilder serviceHttpUrl = PulsarAdmin.builder().serviceHttpUrl(str);
                serviceHttpUrl.loadConf(PropertiesUtils.filterAndMapProperties(this.config.getProperties(), "brokerClient_"));
                serviceHttpUrl.authentication(configuration.getBrokerClientAuthenticationPlugin(), configuration.getBrokerClientAuthenticationParameters());
                if (configuration.isBrokerClientTlsEnabled()) {
                    if (configuration.isBrokerClientTlsEnabledWithKeyStore()) {
                        serviceHttpUrl.useKeyStoreTls(true).tlsTrustStoreType(configuration.getBrokerClientTlsTrustStoreType()).tlsTrustStorePath(configuration.getBrokerClientTlsTrustStore()).tlsTrustStorePassword(configuration.getBrokerClientTlsTrustStorePassword());
                    } else {
                        serviceHttpUrl.tlsTrustCertsFilePath(configuration.getBrokerClientTrustCertsFilePath());
                    }
                    serviceHttpUrl.allowTlsInsecureConnection(configuration.isTlsAllowInsecureConnection());
                }
                serviceHttpUrl.readTimeout(configuration.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.adminClient = serviceHttpUrl.build();
                LOG.info("created admin with url {} ", str);
            } catch (Exception e) {
                throw new PulsarServerException(e);
            }
        }
        return this.adminClient;
    }

    public MetricsGenerator getMetricsGenerator() {
        return this.metricsGenerator;
    }

    public TransactionMetadataStoreService getTransactionMetadataStoreService() {
        return this.transactionMetadataStoreService;
    }

    public TransactionBufferProvider getTransactionBufferProvider() {
        return this.transactionBufferProvider;
    }

    public TransactionBufferClient getTransactionBufferClient() {
        return this.transactionBufferClient;
    }

    public ZooKeeperSessionWatcher.ShutdownService getShutdownService() {
        return this.shutdownService;
    }

    protected String brokerUrl(ServiceConfiguration serviceConfiguration) {
        if (serviceConfiguration.getBrokerServicePort().isPresent()) {
            return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(serviceConfiguration), getBrokerListenPort().get().intValue());
        }
        return null;
    }

    public static String brokerUrl(String str, int i) {
        return String.format("pulsar://%s:%d", str, Integer.valueOf(i));
    }

    public String brokerUrlTls(ServiceConfiguration serviceConfiguration) {
        if (serviceConfiguration.getBrokerServicePortTls().isPresent()) {
            return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(serviceConfiguration), getBrokerListenPortTls().get().intValue());
        }
        return null;
    }

    public static String brokerUrlTls(String str, int i) {
        return String.format("pulsar+ssl://%s:%d", str, Integer.valueOf(i));
    }

    public String webAddress(ServiceConfiguration serviceConfiguration) {
        if (serviceConfiguration.getWebServicePort().isPresent()) {
            return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(serviceConfiguration), getListenPortHTTP().get().intValue());
        }
        return null;
    }

    public static String webAddress(String str, int i) {
        return String.format("http://%s:%d", str, Integer.valueOf(i));
    }

    public String webAddressTls(ServiceConfiguration serviceConfiguration) {
        if (serviceConfiguration.getWebServicePortTls().isPresent()) {
            return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(serviceConfiguration), getListenPortHTTPS().get().intValue());
        }
        return null;
    }

    public static String webAddressTls(String str, int i) {
        return String.format("https://%s:%d", str, Integer.valueOf(i));
    }

    public String getSafeWebServiceAddress() {
        return this.webServiceAddress != null ? this.webServiceAddress : this.webServiceAddressTls;
    }

    public String getSafeBrokerServiceUrl() {
        return this.brokerServiceUrl != null ? this.brokerServiceUrl : this.brokerServiceUrlTls;
    }

    public static String bookieMetadataServiceUri(ServiceConfiguration serviceConfiguration) {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        String str = null;
        try {
            clientConfiguration.setZkServers(serviceConfiguration.getZookeeperServers());
            str = clientConfiguration.getMetadataServiceUri();
        } catch (ConfigurationException e) {
            LOG.error("Failed to get bookkeeper metadata service uri", e);
        }
        return str;
    }

    public TopicPoliciesService getTopicPoliciesService() {
        return this.topicPoliciesService;
    }

    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider prometheusRawMetricsProvider) {
        if (this.metricsServlet != null) {
            this.metricsServlet.addRawMetricsProvider(prometheusRawMetricsProvider);
            return;
        }
        if (this.pendingMetricsProviders == null) {
            this.pendingMetricsProviders = new LinkedList();
        }
        this.pendingMetricsProviders.add(prometheusRawMetricsProvider);
    }

    private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException {
        if (this.functionWorkerService.isPresent()) {
            LOG.info("Starting function worker service");
            WorkerConfig workerConfig = this.functionWorkerService.get().getWorkerConfig();
            if (workerConfig.isUseTls() || this.brokerServiceUrl == null) {
                workerConfig.setPulsarServiceUrl(this.brokerServiceUrlTls);
            } else {
                workerConfig.setPulsarServiceUrl(this.brokerServiceUrl);
            }
            if (workerConfig.isUseTls() || this.webServiceAddress == null) {
                workerConfig.setPulsarWebServiceUrl(this.webServiceAddressTls);
                workerConfig.setFunctionWebServiceUrl(this.webServiceAddressTls);
            } else {
                workerConfig.setPulsarWebServiceUrl(this.webServiceAddress);
                workerConfig.setFunctionWebServiceUrl(this.webServiceAddress);
            }
            LOG.info("Starting function worker service: serviceUrl = {}, webServiceUrl = {}, functionWebServiceUrl = {}", new Object[]{workerConfig.getPulsarServiceUrl(), workerConfig.getPulsarWebServiceUrl(), workerConfig.getFunctionWebServiceUrl()});
            String pulsarFunctionsNamespace = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace();
            String str = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/")[0];
            String pulsarFunctionsCluster = this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster();
            try {
                NamedEntity.checkName(str);
                getGlobalZkCache().getZooKeeper().create(AdminResource.path("policies", str), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new TenantInfo(Sets.newHashSet(this.config.getSuperUserRoles()), Sets.newHashSet(pulsarFunctionsCluster))), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                LOG.info("Created property {} for function worker", str);
            } catch (IllegalArgumentException e) {
                LOG.error("Failed to create property with invalid name {} for function worker service", pulsarFunctionsCluster, e);
                throw e;
            } catch (KeeperException.NodeExistsException e2) {
                LOG.debug("Failed to create already existing property {} for function worker service", pulsarFunctionsCluster, e2);
            } catch (Exception e3) {
                LOG.error("Failed to create property {} for function worker", pulsarFunctionsCluster, e3);
                throw e3;
            }
            try {
                NamedEntity.checkName(pulsarFunctionsCluster);
                getGlobalZkCache().getZooKeeper().create(AdminResource.path("clusters", pulsarFunctionsCluster), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new ClusterData(getSafeWebServiceAddress(), null, this.brokerServiceUrl, null)), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                LOG.info("Created cluster {} for function worker", pulsarFunctionsCluster);
            } catch (IllegalArgumentException e4) {
                LOG.error("Failed to create cluster with invalid name {} for function worker service", pulsarFunctionsCluster, e4);
                throw e4;
            } catch (KeeperException.NodeExistsException e5) {
                LOG.debug("Failed to create already existing cluster {} for function worker service", pulsarFunctionsCluster, e5);
            } catch (Exception e6) {
                LOG.error("Failed to create cluster {} for function worker service", pulsarFunctionsCluster, e6);
                throw e6;
            }
            try {
                Policies policies = new Policies();
                policies.retention_policies = new RetentionPolicies(-1, -1);
                policies.replication_clusters = Collections.singleton(this.functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster());
                policies.bundles = NamespacesBase.getBundles(getConfiguration().getDefaultNumberOfNamespaceBundles());
                getConfigurationCache().policiesCache().invalidate(AdminResource.path("policies", pulsarFunctionsNamespace));
                ZkUtils.createFullPathOptimistic(getGlobalZkCache().getZooKeeper(), AdminResource.path("policies", pulsarFunctionsNamespace), ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                LOG.info("Created namespace {} for function worker service", pulsarFunctionsNamespace);
            } catch (KeeperException.NodeExistsException e7) {
                LOG.debug("Failed to create already existing namespace {} for function worker service", pulsarFunctionsNamespace);
            } catch (Exception e8) {
                LOG.error("Failed to create namespace {}", pulsarFunctionsNamespace, e8);
                throw e8;
            }
            InternalConfigurationData internalConfigurationData = getInternalConfigurationData();
            try {
                URI initializeDlogNamespace = WorkerUtils.initializeDlogNamespace(internalConfigurationData);
                LOG.info("Function worker service setup completed");
                this.functionWorkerService.get().start(initializeDlogNamespace, authenticationService, authorizationService, ErrorNotifier.getShutdownServiceImpl(this.shutdownService));
                LOG.info("Function worker service started");
            } catch (IOException e9) {
                LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages", new Object[]{internalConfigurationData.getZookeeperServers(), internalConfigurationData.getBookkeeperMetadataServiceUri(), e9});
                throw e9;
            }
        }
    }

    public Optional<Integer> getListenPortHTTP() {
        return this.webService.getListenPortHTTP();
    }

    public Optional<Integer> getListenPortHTTPS() {
        return this.webService.getListenPortHTTPS();
    }

    public Optional<Integer> getBrokerListenPort() {
        return this.brokerService.getListenPort();
    }

    public Optional<Integer> getBrokerListenPortTls() {
        return this.brokerService.getListenPortTls();
    }

    public CompletableFuture<Set<String>> getAvailableBookiesAsync() {
        return this.localZkCacheService.availableBookiesCache().getAsync();
    }

    public ServiceConfiguration getConfig() {
        return this.config;
    }

    public NamespaceService getNsService() {
        return this.nsService;
    }

    public WebService getWebService() {
        return this.webService;
    }

    public WebSocketService getWebSocketService() {
        return this.webSocketService;
    }

    public ConfigurationCacheService getConfigurationCacheService() {
        return this.configurationCacheService;
    }

    public BookKeeperClientFactory getBkClientFactory() {
        return this.bkClientFactory;
    }

    public LocalZooKeeperConnectionService getLocalZooKeeperConnectionProvider() {
        return this.localZooKeeperConnectionProvider;
    }

    public OrderedScheduler getOffloaderScheduler() {
        return this.offloaderScheduler;
    }

    public OffloadersCache getOffloadersCache() {
        return this.offloadersCache;
    }

    public LedgerOffloader getDefaultOffloader() {
        return this.defaultOffloader;
    }

    public Map<NamespaceName, LedgerOffloader> getLedgerOffloaderMap() {
        return this.ledgerOffloaderMap;
    }

    public ScheduledFuture<?> getLoadReportTask() {
        return this.loadReportTask;
    }

    public ScheduledFuture<?> getLoadSheddingTask() {
        return this.loadSheddingTask;
    }

    public ScheduledFuture<?> getLoadResourceQuotaTask() {
        return this.loadResourceQuotaTask;
    }

    public AtomicReference<LoadManager> getLoadManager() {
        return this.loadManager;
    }

    public ZooKeeperClientFactory getZkClientFactory() {
        return this.zkClientFactory;
    }

    public String getBindAddress() {
        return this.bindAddress;
    }

    public String getAdvertisedAddress() {
        return this.advertisedAddress;
    }

    public String getWebServiceAddress() {
        return this.webServiceAddress;
    }

    public String getWebServiceAddressTls() {
        return this.webServiceAddressTls;
    }

    public String getBrokerServiceUrl() {
        return this.brokerServiceUrl;
    }

    public String getBrokerServiceUrlTls() {
        return this.brokerServiceUrlTls;
    }

    public String getBrokerVersion() {
        return this.brokerVersion;
    }

    public SchemaStorage getSchemaStorage() {
        return this.schemaStorage;
    }

    public SchemaRegistryService getSchemaRegistryService() {
        return this.schemaRegistryService;
    }

    public Optional<WorkerService> getFunctionWorkerService() {
        return this.functionWorkerService;
    }

    public ProtocolHandlers getProtocolHandlers() {
        return this.protocolHandlers;
    }

    public BrokerInterceptor getBrokerInterceptor() {
        return this.brokerInterceptor;
    }

    public PrometheusMetricsServlet getMetricsServlet() {
        return this.metricsServlet;
    }

    public List<PrometheusRawMetricsProvider> getPendingMetricsProviders() {
        return this.pendingMetricsProviders;
    }

    public ReentrantLock getMutex() {
        return this.mutex;
    }

    public Condition getIsClosedCondition() {
        return this.isClosedCondition;
    }

    public Map<String, AdvertisedListener> getAdvertisedListeners() {
        return this.advertisedListeners;
    }

    protected void setConfig(ServiceConfiguration serviceConfiguration) {
        this.config = serviceConfiguration;
    }

    protected void setNsService(NamespaceService namespaceService) {
        this.nsService = namespaceService;
    }

    protected void setManagedLedgerClientFactory(ManagedLedgerClientFactory managedLedgerClientFactory) {
        this.managedLedgerClientFactory = managedLedgerClientFactory;
    }

    protected void setLeaderElectionService(LeaderElectionService leaderElectionService) {
        this.leaderElectionService = leaderElectionService;
    }

    protected void setBrokerService(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    protected void setWebService(WebService webService) {
        this.webService = webService;
    }

    protected void setWebSocketService(WebSocketService webSocketService) {
        this.webSocketService = webSocketService;
    }

    protected void setConfigurationCacheService(ConfigurationCacheService configurationCacheService) {
        this.configurationCacheService = configurationCacheService;
    }

    protected void setLocalZkCacheService(LocalZooKeeperCacheService localZooKeeperCacheService) {
        this.localZkCacheService = localZooKeeperCacheService;
    }

    protected void setTopicPoliciesService(TopicPoliciesService topicPoliciesService) {
        this.topicPoliciesService = topicPoliciesService;
    }

    protected void setBkClientFactory(BookKeeperClientFactory bookKeeperClientFactory) {
        this.bkClientFactory = bookKeeperClientFactory;
    }

    protected void setLocalZkCache(ZooKeeperCache zooKeeperCache) {
        this.localZkCache = zooKeeperCache;
    }

    protected void setGlobalZkCache(GlobalZooKeeperCache globalZooKeeperCache) {
        this.globalZkCache = globalZooKeeperCache;
    }

    protected void setLocalZooKeeperConnectionProvider(LocalZooKeeperConnectionService localZooKeeperConnectionService) {
        this.localZooKeeperConnectionProvider = localZooKeeperConnectionService;
    }

    protected void setCompactor(Compactor compactor) {
        this.compactor = compactor;
    }

    protected void setOrderedExecutor(OrderedExecutor orderedExecutor) {
        this.orderedExecutor = orderedExecutor;
    }

    protected void setCompactorExecutor(ScheduledExecutorService scheduledExecutorService) {
        this.compactorExecutor = scheduledExecutorService;
    }

    protected void setOffloaderScheduler(OrderedScheduler orderedScheduler) {
        this.offloaderScheduler = orderedScheduler;
    }

    protected void setOffloadersCache(OffloadersCache offloadersCache) {
        this.offloadersCache = offloadersCache;
    }

    protected void setDefaultOffloader(LedgerOffloader ledgerOffloader) {
        this.defaultOffloader = ledgerOffloader;
    }

    protected void setLedgerOffloaderMap(Map<NamespaceName, LedgerOffloader> map) {
        this.ledgerOffloaderMap = map;
    }

    protected void setLoadReportTask(ScheduledFuture<?> scheduledFuture) {
        this.loadReportTask = scheduledFuture;
    }

    protected void setLoadSheddingTask(ScheduledFuture<?> scheduledFuture) {
        this.loadSheddingTask = scheduledFuture;
    }

    protected void setLoadResourceQuotaTask(ScheduledFuture<?> scheduledFuture) {
        this.loadResourceQuotaTask = scheduledFuture;
    }

    protected void setAdminClient(PulsarAdmin pulsarAdmin) {
        this.adminClient = pulsarAdmin;
    }

    protected void setClient(PulsarClient pulsarClient) {
        this.client = pulsarClient;
    }

    protected void setZkClientFactory(ZooKeeperClientFactory zooKeeperClientFactory) {
        this.zkClientFactory = zooKeeperClientFactory;
    }

    protected void setWebServiceAddress(String str) {
        this.webServiceAddress = str;
    }

    protected void setWebServiceAddressTls(String str) {
        this.webServiceAddressTls = str;
    }

    protected void setBrokerServiceUrl(String str) {
        this.brokerServiceUrl = str;
    }

    protected void setBrokerServiceUrlTls(String str) {
        this.brokerServiceUrlTls = str;
    }

    protected void setSchemaStorage(SchemaStorage schemaStorage) {
        this.schemaStorage = schemaStorage;
    }

    protected void setSchemaRegistryService(SchemaRegistryService schemaRegistryService) {
        this.schemaRegistryService = schemaRegistryService;
    }

    protected void setProtocolHandlers(ProtocolHandlers protocolHandlers) {
        this.protocolHandlers = protocolHandlers;
    }

    protected void setMetricsGenerator(MetricsGenerator metricsGenerator) {
        this.metricsGenerator = metricsGenerator;
    }

    protected void setTransactionMetadataStoreService(TransactionMetadataStoreService transactionMetadataStoreService) {
        this.transactionMetadataStoreService = transactionMetadataStoreService;
    }

    protected void setTransactionBufferProvider(TransactionBufferProvider transactionBufferProvider) {
        this.transactionBufferProvider = transactionBufferProvider;
    }

    protected void setTransactionBufferClient(TransactionBufferClient transactionBufferClient) {
        this.transactionBufferClient = transactionBufferClient;
    }

    protected void setBrokerInterceptor(BrokerInterceptor brokerInterceptor) {
        this.brokerInterceptor = brokerInterceptor;
    }

    protected void setMetricsServlet(PrometheusMetricsServlet prometheusMetricsServlet) {
        this.metricsServlet = prometheusMetricsServlet;
    }

    protected void setPendingMetricsProviders(List<PrometheusRawMetricsProvider> list) {
        this.pendingMetricsProviders = list;
    }

    protected void setState(State state) {
        this.state = state;
    }

    protected void setAdvertisedListeners(Map<String, AdvertisedListener> map) {
        this.advertisedListeners = map;
    }
}
