package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/BookKeeperClientFactoryImpl.class */
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
    private static final Logger log = LoggerFactory.getLogger(BookKeeperClientFactoryImpl.class);
    private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
    private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
    private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference<>();

    @Override // org.apache.pulsar.broker.BookKeeperClientFactory
    public BookKeeper create(ServiceConfiguration serviceConfiguration, ZooKeeper zooKeeper, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> optional, Map<String, Object> map) throws IOException {
        return create(serviceConfiguration, zooKeeper, eventLoopGroup, optional, map, NullStatsLogger.INSTANCE);
    }

    @Override // org.apache.pulsar.broker.BookKeeperClientFactory
    public BookKeeper create(ServiceConfiguration serviceConfiguration, ZooKeeper zooKeeper, EventLoopGroup eventLoopGroup, Optional<Class<? extends EnsemblePlacementPolicy>> optional, Map<String, Object> map, StatsLogger statsLogger) throws IOException {
        ClientConfiguration createBkClientConfiguration = createBkClientConfiguration(serviceConfiguration);
        if (map != null) {
            map.forEach((str, obj) -> {
                createBkClientConfiguration.setProperty(str, obj);
            });
        }
        if (optional.isPresent()) {
            setEnsemblePlacementPolicy(createBkClientConfiguration, serviceConfiguration, zooKeeper, optional.get());
        } else {
            setDefaultEnsemblePlacementPolicy(this.rackawarePolicyZkCache, this.clientIsolationZkCache, createBkClientConfiguration, serviceConfiguration, zooKeeper);
        }
        try {
            return BookKeeper.forConfig(createBkClientConfiguration).allocator(PulsarByteBufAllocator.DEFAULT).eventLoopGroup(eventLoopGroup).statsLogger(statsLogger).build();
        } catch (InterruptedException | BKException e) {
            throw new IOException(e);
        }
    }

    @VisibleForTesting
    ClientConfiguration createBkClientConfiguration(ServiceConfiguration serviceConfiguration) {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        if (serviceConfiguration.getBookkeeperClientAuthenticationPlugin() != null && serviceConfiguration.getBookkeeperClientAuthenticationPlugin().trim().length() > 0) {
            clientConfiguration.setClientAuthProviderFactoryClass(serviceConfiguration.getBookkeeperClientAuthenticationPlugin());
            clientConfiguration.setProperty(serviceConfiguration.getBookkeeperClientAuthenticationParametersName(), serviceConfiguration.getBookkeeperClientAuthenticationParameters());
        }
        if (serviceConfiguration.isBookkeeperTLSClientAuthentication()) {
            clientConfiguration.setTLSClientAuthentication(true);
            clientConfiguration.setTLSCertificatePath(serviceConfiguration.getBookkeeperTLSCertificateFilePath());
            clientConfiguration.setTLSKeyStore(serviceConfiguration.getBookkeeperTLSKeyFilePath());
            clientConfiguration.setTLSKeyStoreType(serviceConfiguration.getBookkeeperTLSKeyFileType());
            clientConfiguration.setTLSKeyStorePasswordPath(serviceConfiguration.getBookkeeperTLSKeyStorePasswordPath());
            clientConfiguration.setTLSProviderFactoryClass(serviceConfiguration.getBookkeeperTLSProviderFactoryClass());
            clientConfiguration.setTLSTrustStore(serviceConfiguration.getBookkeeperTLSTrustCertsFilePath());
            clientConfiguration.setTLSTrustStoreType(serviceConfiguration.getBookkeeperTLSTrustCertTypes());
            clientConfiguration.setTLSTrustStorePasswordPath(serviceConfiguration.getBookkeeperTLSTrustStorePasswordPath());
            clientConfiguration.setTLSCertFilesRefreshDurationSeconds(serviceConfiguration.getBookkeeperTlsCertFilesRefreshDurationSeconds());
        }
        clientConfiguration.setBusyWaitEnabled(serviceConfiguration.isEnableBusyWait());
        clientConfiguration.setNumWorkerThreads(serviceConfiguration.getBookkeeperClientNumWorkerThreads());
        clientConfiguration.setThrottleValue(serviceConfiguration.getBookkeeperClientThrottleValue());
        clientConfiguration.setAddEntryTimeout((int) serviceConfiguration.getBookkeeperClientTimeoutInSeconds());
        clientConfiguration.setReadEntryTimeout((int) serviceConfiguration.getBookkeeperClientTimeoutInSeconds());
        clientConfiguration.setSpeculativeReadTimeout(serviceConfiguration.getBookkeeperClientSpeculativeReadTimeoutInMillis());
        clientConfiguration.setNumChannelsPerBookie(serviceConfiguration.getBookkeeperNumberOfChannelsPerBookie());
        clientConfiguration.setUseV2WireProtocol(serviceConfiguration.isBookkeeperUseV2WireProtocol());
        clientConfiguration.setEnableDigestTypeAutodetection(true);
        clientConfiguration.setStickyReadsEnabled(serviceConfiguration.isBookkeeperEnableStickyReads());
        clientConfiguration.setNettyMaxFrameSizeBytes(serviceConfiguration.getMaxMessageSize() + 10240);
        clientConfiguration.setDiskWeightBasedPlacementEnabled(serviceConfiguration.isBookkeeperDiskWeightBasedPlacementEnabled());
        if (StringUtils.isNotBlank(serviceConfiguration.getBookkeeperMetadataServiceUri())) {
            clientConfiguration.setMetadataServiceUri(serviceConfiguration.getBookkeeperMetadataServiceUri());
        } else {
            clientConfiguration.setMetadataServiceUri(PulsarService.bookieMetadataServiceUri(serviceConfiguration));
        }
        if (serviceConfiguration.isBookkeeperClientHealthCheckEnabled()) {
            clientConfiguration.enableBookieHealthCheck();
            clientConfiguration.setBookieHealthCheckInterval(serviceConfiguration.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
            clientConfiguration.setBookieErrorThresholdPerInterval(serviceConfiguration.getBookkeeperClientHealthCheckErrorThresholdPerInterval());
            clientConfiguration.setBookieQuarantineTime((int) serviceConfiguration.getBookkeeperClientHealthCheckQuarantineTimeInSeconds(), TimeUnit.SECONDS);
            clientConfiguration.setBookieQuarantineRatio(serviceConfiguration.getBookkeeperClientQuarantineRatio());
        }
        clientConfiguration.setReorderReadSequenceEnabled(serviceConfiguration.isBookkeeperClientReorderReadSequenceEnabled());
        clientConfiguration.setExplictLacInterval(serviceConfiguration.getBookkeeperExplicitLacIntervalInMills());
        clientConfiguration.setGetBookieInfoIntervalSeconds(serviceConfiguration.getBookkeeperClientGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
        clientConfiguration.setGetBookieInfoRetryIntervalSeconds(serviceConfiguration.getBookkeeperClientGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
        PropertiesUtils.filterAndMapProperties(serviceConfiguration.getProperties(), "bookkeeper_").forEach((str, obj) -> {
            log.info("Applying BookKeeper client configuration setting {}={}", str, obj);
            clientConfiguration.setProperty(str, obj);
        });
        return clientConfiguration;
    }

    public static void setDefaultEnsemblePlacementPolicy(AtomicReference<ZooKeeperCache> atomicReference, AtomicReference<ZooKeeperCache> atomicReference2, ClientConfiguration clientConfiguration, ServiceConfiguration serviceConfiguration, ZooKeeper zooKeeper) {
        if (serviceConfiguration.isBookkeeperClientRackawarePolicyEnabled() || serviceConfiguration.isBookkeeperClientRegionawarePolicyEnabled()) {
            if (serviceConfiguration.isBookkeeperClientRegionawarePolicyEnabled()) {
                clientConfiguration.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
                clientConfiguration.setProperty("reppEnableValidation", serviceConfiguration.getProperties().getProperty("reppEnableValidation", "true"));
                clientConfiguration.setProperty("reppRegionsToWrite", serviceConfiguration.getProperties().getProperty("reppRegionsToWrite", null));
                clientConfiguration.setProperty("reppMinimumRegionsForDurability", serviceConfiguration.getProperties().getProperty("reppMinimumRegionsForDurability", "2"));
                clientConfiguration.setProperty("reppEnableDurabilityEnforcementInReplace", serviceConfiguration.getProperties().getProperty("reppEnableDurabilityEnforcementInReplace", "true"));
            } else {
                clientConfiguration.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
            }
            clientConfiguration.setMinNumRacksPerWriteQuorum(serviceConfiguration.getBookkeeperClientMinNumRacksPerWriteQuorum());
            clientConfiguration.setEnforceMinNumRacksPerWriteQuorum(serviceConfiguration.isBookkeeperClientEnforceMinNumRacksPerWriteQuorum());
            clientConfiguration.setProperty("reppDnsResolverClass", serviceConfiguration.getProperties().getProperty("reppDnsResolverClass", ZkBookieRackAffinityMapping.class.getName()));
            clientConfiguration.setProperty("networkTopologyScriptFileName", serviceConfiguration.getProperties().getProperty("networkTopologyScriptFileName", ""));
            ZooKeeperCache zooKeeperCache = new ZooKeeperCache("bookies-racks", zooKeeper, serviceConfiguration.getZooKeeperOperationTimeoutSeconds()) { // from class: org.apache.pulsar.broker.BookKeeperClientFactoryImpl.1
            };
            if (!atomicReference.compareAndSet(null, zooKeeperCache)) {
                zooKeeperCache.stop();
            }
            clientConfiguration.setProperty("zk_cache_instance", atomicReference.get());
        }
        if (serviceConfiguration.getBookkeeperClientIsolationGroups() == null || serviceConfiguration.getBookkeeperClientIsolationGroups().isEmpty()) {
            return;
        }
        clientConfiguration.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
        clientConfiguration.setProperty("isolationBookieGroups", serviceConfiguration.getBookkeeperClientIsolationGroups());
        clientConfiguration.setProperty("secondaryIsolationBookieGroups", serviceConfiguration.getBookkeeperClientSecondaryIsolationGroups());
        if (clientConfiguration.getProperty("zk_cache_instance") == null) {
            ZooKeeperCache zooKeeperCache2 = new ZooKeeperCache("bookies-isolation", zooKeeper, serviceConfiguration.getZooKeeperOperationTimeoutSeconds()) { // from class: org.apache.pulsar.broker.BookKeeperClientFactoryImpl.2
            };
            if (!atomicReference2.compareAndSet(null, zooKeeperCache2)) {
                zooKeeperCache2.stop();
            }
            clientConfiguration.setProperty("zk_cache_instance", atomicReference2.get());
        }
    }

    private void setEnsemblePlacementPolicy(ClientConfiguration clientConfiguration, ServiceConfiguration serviceConfiguration, ZooKeeper zooKeeper, Class<? extends EnsemblePlacementPolicy> cls) {
        clientConfiguration.setEnsemblePlacementPolicy(cls);
        if (clientConfiguration.getProperty("zk_cache_instance") == null) {
            ZooKeeperCache zooKeeperCache = new ZooKeeperCache("bookies-rackaware", zooKeeper, serviceConfiguration.getZooKeeperOperationTimeoutSeconds()) { // from class: org.apache.pulsar.broker.BookKeeperClientFactoryImpl.3
            };
            if (!this.zkCache.compareAndSet(null, zooKeeperCache)) {
                zooKeeperCache.stop();
            }
            clientConfiguration.setProperty("zk_cache_instance", this.zkCache.get());
        }
        if (serviceConfiguration.isBookkeeperClientRackawarePolicyEnabled() || serviceConfiguration.isBookkeeperClientRegionawarePolicyEnabled()) {
            clientConfiguration.setProperty("reppDnsResolverClass", serviceConfiguration.getProperties().getProperty("reppDnsResolverClass", ZkBookieRackAffinityMapping.class.getName()));
            clientConfiguration.setProperty("networkTopologyScriptFileName", serviceConfiguration.getProperties().getProperty("networkTopologyScriptFileName", ""));
        }
    }

    @Override // org.apache.pulsar.broker.BookKeeperClientFactory
    public void close() {
        if (this.rackawarePolicyZkCache.get() != null) {
            this.rackawarePolicyZkCache.get().stop();
        }
        if (this.clientIsolationZkCache.get() != null) {
            this.clientIsolationZkCache.get().stop();
        }
        if (this.zkCache.get() != null) {
            this.zkCache.get().stop();
        }
    }
}
