package org.apache.pulsar.broker.loadbalance.extensions;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.class */
public class BrokerRegistryImpl implements BrokerRegistry {
    private static final Logger log = LoggerFactory.getLogger(BrokerRegistryImpl.class);
    private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;
    private final PulsarService pulsar;
    private final ServiceConfiguration conf;
    private final BrokerLookupData brokerLookupData;
    private final MetadataCache<BrokerLookupData> brokerLookupDataMetadataCache;
    private final String brokerIdKeyPath;
    private final ScheduledExecutorService scheduler;
    private final List<BiConsumer<String, NotificationType>> listeners;

    @VisibleForTesting
    final AtomicReference<State> state;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl$State.class */
    public enum State {
        Init,
        Started,
        Registered,
        Unregistering,
        Closed
    }

    @VisibleForTesting
    BrokerRegistryImpl(PulsarService pulsarService, MetadataCache<BrokerLookupData> metadataCache) {
        this.state = new AtomicReference<>(State.Init);
        this.pulsar = pulsarService;
        this.conf = pulsarService.getConfiguration();
        this.brokerLookupDataMetadataCache = metadataCache;
        this.scheduler = pulsarService.getLoadManagerExecutor();
        this.listeners = new ArrayList();
        this.brokerIdKeyPath = keyPath(pulsarService.getBrokerId());
        this.brokerLookupData = new BrokerLookupData(pulsarService.getWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls(), pulsarService.getAdvertisedListeners(), pulsarService.getProtocolDataToAdvertise(), pulsarService.getConfiguration().isEnablePersistentTopics(), pulsarService.getConfiguration().isEnableNonPersistentTopics(), this.conf.getLoadManagerClassName(), System.currentTimeMillis(), pulsarService.getBrokerVersion());
    }

    public BrokerRegistryImpl(PulsarService pulsarService) {
        this(pulsarService, pulsarService.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void start() throws PulsarServerException {
        if (!this.state.compareAndSet(State.Init, State.Started)) {
            throw new PulsarServerException("Cannot start the broker registry in state " + this.state.get());
        }
        this.pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        try {
            registerAsync().get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw PulsarServerException.from(e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public boolean isStarted() {
        State state = this.state.get();
        return state == State.Started || state == State.Registered;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public boolean isRegistered() {
        return this.state.get() == State.Registered;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<Void> registerAsync() {
        State state = this.state.get();
        if (state == State.Started || state == State.Registered) {
            log.info("[{}] Started registering self to {} (state: {})", new Object[]{getBrokerId(), this.brokerIdKeyPath, state});
            return this.brokerLookupDataMetadataCache.put(this.brokerIdKeyPath, this.brokerLookupData, EnumSet.of(CreateOption.Ephemeral)).orTimeout(this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).whenComplete((r6, th) -> {
                if (th != null) {
                    log.error("[{}] Failed registering self", getBrokerId(), th);
                } else {
                    this.state.set(State.Registered);
                    log.info("[{}] Finished registering self", getBrokerId());
                }
            });
        }
        log.info("[{}] Skip registering self because the state is {}", getBrokerId(), state);
        return CompletableFuture.completedFuture(null);
    }

    private void doRegisterAsyncWithRetries(int i, CompletableFuture<Void> completableFuture) {
        this.pulsar.getExecutor().schedule(() -> {
            registerAsync().whenComplete((r7, th) -> {
                if (th != null) {
                    doRegisterAsyncWithRetries(i + 1, completableFuture);
                } else {
                    completableFuture.complete(null);
                }
            });
        }, Math.min(1000, i * i * 50), TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<Void> registerAsyncWithRetries() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        doRegisterAsyncWithRetries(0, completableFuture);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void unregister() throws MetadataStoreException {
        try {
            if (this.state.compareAndSet(State.Registered, State.Unregistering)) {
                try {
                    this.brokerLookupDataMetadataCache.delete(this.brokerIdKeyPath).get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                    this.state.set(State.Started);
                } catch (InterruptedException | TimeoutException e) {
                    throw MetadataStoreException.unwrap(e);
                } catch (ExecutionException e2) {
                    if (!(e2.getCause() instanceof MetadataStoreException.NotFoundException)) {
                        throw MetadataStoreException.unwrap(e2);
                    }
                    log.warn("{} has already been unregistered", this.brokerIdKeyPath);
                    this.state.set(State.Started);
                }
            }
        } catch (Throwable th) {
            this.state.set(State.Started);
            throw th;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public String getBrokerId() {
        return this.pulsar.getBrokerId();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<List<String>> getAvailableBrokersAsync() {
        checkState();
        return this.brokerLookupDataMetadataCache.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String str) {
        checkState();
        return this.brokerLookupDataMetadataCache.get(keyPath(str));
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
        checkState();
        return getAvailableBrokersAsync().thenCompose(list -> {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            ArrayList arrayList = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                arrayList.add(lookupAsync(str).thenAccept(optional -> {
                    if (optional.isPresent()) {
                        concurrentHashMap.put(str, (BrokerLookupData) optional.get());
                    } else {
                        log.warn("Got an empty lookup data, brokerId: {}", str);
                    }
                }));
            }
            return FutureUtil.waitForAll(arrayList).thenApply(r3 -> {
                return concurrentHashMap;
            });
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void addListener(BiConsumer<String, NotificationType> biConsumer) {
        checkState();
        this.listeners.add(biConsumer);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry, java.lang.AutoCloseable
    public synchronized void close() throws PulsarServerException {
        try {
        } catch (Exception e) {
            log.error("Unexpected error when unregistering the broker registry", e);
        } finally {
            this.state.set(State.Closed);
        }
        if (this.state.get() == State.Closed) {
            return;
        }
        this.listeners.clear();
        unregister();
    }

    private void handleMetadataStoreNotification(Notification notification) {
        CompletableFuture<Void> completedFuture;
        if (isStarted() && isVerifiedNotification(notification)) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Handle notification: [{}]", notification);
                }
                String substring = notification.getPath().substring(LoadManager.LOADBALANCE_BROKERS_ROOT.length() + 1);
                if (notification.getType() == NotificationType.Deleted && getBrokerId().equals(substring)) {
                    this.state.set(State.Started);
                    completedFuture = registerAsyncWithRetries();
                } else {
                    completedFuture = CompletableFuture.completedFuture(null);
                }
                completedFuture.thenAccept(r8 -> {
                    if (this.listeners.isEmpty()) {
                        return;
                    }
                    this.scheduler.submit(() -> {
                        Iterator<BiConsumer<String, NotificationType>> it = this.listeners.iterator();
                        while (it.hasNext()) {
                            it.next().accept(substring, notification.getType());
                        }
                    });
                });
            } catch (RejectedExecutionException e) {
            }
        }
    }

    @VisibleForTesting
    protected static boolean isVerifiedNotification(Notification notification) {
        return notification.getPath().startsWith("/loadbalance/brokers/") && notification.getPath().length() > LoadManager.LOADBALANCE_BROKERS_ROOT.length() + 1;
    }

    @VisibleForTesting
    protected static String keyPath(String str) {
        return String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str);
    }

    private void checkState() throws IllegalStateException {
        if (this.state.get() == State.Closed) {
            throw new IllegalStateException("The registry already closed.");
        }
    }
}
