package org.apache.pulsar.broker.loadbalance.extensions;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import lombok.Generated;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.class */
public class BrokerRegistryImpl implements BrokerRegistry {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BrokerRegistryImpl.class);
    private final PulsarService pulsar;
    private final ServiceConfiguration conf;
    private final BrokerLookupData brokerLookupData;
    private final LockManager<BrokerLookupData> brokerLookupDataLockManager;
    private final String brokerId;
    private final ScheduledExecutorService scheduler;
    private volatile ResourceLock<BrokerLookupData> brokerLookupDataLock;
    private final List<BiConsumer<String, NotificationType>> listeners = new ArrayList();
    private State state = State.Init;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl$State.class */
    public enum State {
        Init,
        Started,
        Registered,
        Closed
    }

    public BrokerRegistryImpl(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.conf = pulsarService.getConfiguration();
        this.brokerLookupDataLockManager = pulsarService.getCoordinationService().getLockManager(BrokerLookupData.class);
        this.scheduler = pulsarService.getLoadManagerExecutor();
        this.brokerId = pulsarService.getBrokerId();
        this.brokerLookupData = new BrokerLookupData(pulsarService.getWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls(), pulsarService.getAdvertisedListeners(), pulsarService.getProtocolDataToAdvertise(), pulsarService.getConfiguration().isEnablePersistentTopics(), pulsarService.getConfiguration().isEnableNonPersistentTopics(), this.conf.getLoadManagerClassName(), System.currentTimeMillis(), pulsarService.getBrokerVersion());
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void start() throws PulsarServerException {
        if (this.state != State.Init) {
            return;
        }
        this.pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        try {
            this.state = State.Started;
            register();
        } catch (MetadataStoreException e) {
            throw new PulsarServerException(e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public boolean isStarted() {
        return this.state == State.Started || this.state == State.Registered;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void register() throws MetadataStoreException {
        if (this.state == State.Started) {
            try {
                this.brokerLookupDataLock = (ResourceLock) this.brokerLookupDataLockManager.acquireLock(keyPath(this.brokerId), this.brokerLookupData).get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.state = State.Registered;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw MetadataStoreException.unwrap(e);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void unregister() throws MetadataStoreException {
        if (this.state == State.Registered) {
            try {
                this.brokerLookupDataLock.release().get(this.conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                this.state = State.Started;
            } catch (InterruptedException | CompletionException | ExecutionException | TimeoutException e) {
                throw MetadataStoreException.unwrap(e);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public String getBrokerId() {
        return this.brokerId;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<List<String>> getAvailableBrokersAsync() {
        checkState();
        return this.brokerLookupDataLockManager.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).thenApply((v1) -> {
            return new ArrayList(v1);
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String str) {
        checkState();
        return this.brokerLookupDataLockManager.readLock(keyPath(str));
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
        checkState();
        return getAvailableBrokersAsync().thenCompose(list -> {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            ArrayList arrayList = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                arrayList.add(lookupAsync(str).thenAccept(optional -> {
                    if (optional.isPresent()) {
                        concurrentHashMap.put(str, (BrokerLookupData) optional.get());
                    } else {
                        log.warn("Got an empty lookup data, brokerId: {}", str);
                    }
                }));
            }
            return FutureUtil.waitForAll(arrayList).thenApply(r3 -> {
                return concurrentHashMap;
            });
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry
    public synchronized void addListener(BiConsumer<String, NotificationType> biConsumer) {
        checkState();
        this.listeners.add(biConsumer);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry, java.lang.AutoCloseable
    public synchronized void close() throws PulsarServerException {
        if (this.state == State.Closed) {
            return;
        }
        try {
            try {
                this.listeners.clear();
                unregister();
                this.brokerLookupDataLockManager.close();
                this.state = State.Closed;
            } catch (Exception e) {
                if (!(e.getCause() instanceof MetadataStoreException.NotFoundException)) {
                    throw new PulsarServerException(MetadataStoreException.unwrap(e));
                }
                throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(e));
            }
        } catch (Throwable th) {
            this.state = State.Closed;
            throw th;
        }
    }

    private void handleMetadataStoreNotification(Notification notification) {
        if (isStarted() && isVerifiedNotification(notification)) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Handle notification: [{}]", notification);
                }
                if (this.listeners.isEmpty()) {
                    return;
                }
                this.scheduler.submit(() -> {
                    String substring = notification.getPath().substring(LoadManager.LOADBALANCE_BROKERS_ROOT.length() + 1);
                    Iterator<BiConsumer<String, NotificationType>> it = this.listeners.iterator();
                    while (it.hasNext()) {
                        it.next().accept(substring, notification.getType());
                    }
                });
            } catch (RejectedExecutionException e) {
            }
        }
    }

    @VisibleForTesting
    protected static boolean isVerifiedNotification(Notification notification) {
        return notification.getPath().startsWith("/loadbalance/brokers/") && notification.getPath().length() > LoadManager.LOADBALANCE_BROKERS_ROOT.length() + 1;
    }

    @VisibleForTesting
    protected static String keyPath(String str) {
        return String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str);
    }

    private void checkState() throws IllegalStateException {
        if (this.state == State.Closed) {
            throw new IllegalStateException("The registry already closed.");
        }
    }
}
