/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Version;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarRegistrationClient
implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger(PulsarRegistrationClient.class);
    private final MetadataStore store;
    private final String ledgersRootPath;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;
    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache = new ConcurrentHashMap();
    private final Set<RegistrationClient.RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
    private final Set<RegistrationClient.RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
    private final ScheduledExecutorService executor;

    public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) {
        this.store = store;
        this.ledgersRootPath = ledgersRootPath;
        this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
        this.bookieRegistrationPath = ledgersRootPath + "/" + "available";
        this.bookieAllRegistrationPath = ledgersRootPath + "/" + "cookies";
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + "readonly";
        this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
        store.registerListener(this::updatedBookies);
    }

    @Override
    public void close() {
        this.executor.shutdownNow();
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return this.getChildren(this.bookieRegistrationPath);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        CompletableFuture<Versioned<Set<BookieId>>> wb = this.getWritableBookies();
        CompletableFuture<Versioned<Set<BookieId>>> rb = this.getReadOnlyBookies();
        return wb.thenCombine(rb, (rw, ro) -> {
            HashSet res = new HashSet();
            res.addAll((Collection)rw.getValue());
            res.addAll((Collection)ro.getValue());
            return new Versioned(res, Version.NEW);
        });
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return this.getChildren(this.bookieReadonlyRegistrationPath);
    }

    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
        return ((CompletableFuture)this.store.getChildren(path).thenComposeAsync(children -> {
            Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
            Set<BookieId> bookies = PulsarRegistrationClient.convertToBookieAddresses(children);
            ArrayList<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated = new ArrayList<CompletableFuture<Versioned<BookieServiceInfo>>>(bookies.size());
            for (BookieId id : bookies) {
                if (this.bookieServiceInfoCache.containsKey(id)) continue;
                bookieInfoUpdated.add(this.readBookieServiceInfoAsync(id));
            }
            if (bookieInfoUpdated.isEmpty()) {
                return CompletableFuture.completedFuture(bookieIds);
            }
            return FutureUtil.waitForAll(bookieInfoUpdated).thenApply(___ -> bookieIds);
        })).thenApply(s -> new Versioned<Set>((Set)s, Version.NEW));
    }

    @Override
    public CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.add(registrationListener);
        return this.getWritableBookies().thenAcceptAsync(registrationListener::onBookiesChanged, (Executor)this.executor);
    }

    @Override
    public void unwatchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.remove(registrationListener);
    }

    @Override
    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.add(registrationListener);
        return this.getReadOnlyBookies().thenAcceptAsync(registrationListener::onBookiesChanged, (Executor)this.executor);
    }

    @Override
    public void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.remove(registrationListener);
    }

    private void updatedBookies(Notification n) {
        if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
            if (n.getType() == NotificationType.Deleted) {
                BookieId bookieId = PulsarRegistrationClient.stripBookieIdFromPath(n.getPath());
                log.info("Bookie {} disappeared", (Object)bookieId);
                if (bookieId != null) {
                    this.bookieServiceInfoCache.remove(bookieId);
                }
            }
            if (n.getPath().startsWith(this.bookieReadonlyRegistrationPath)) {
                this.getReadOnlyBookies().thenAccept(bookies -> this.readOnlyBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
            } else if (n.getPath().startsWith(this.bookieRegistrationPath)) {
                this.getWritableBookies().thenAccept(bookies -> this.writableBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
            }
        }
    }

    private static BookieId stripBookieIdFromPath(String path) {
        if (path == null) {
            return null;
        }
        int slash = path.lastIndexOf(47);
        if (slash >= 0) {
            try {
                return BookieId.parse(path.substring(slash + 1));
            }
            catch (IllegalArgumentException e) {
                log.warn("Cannot decode bookieId from {}", (Object)path, (Object)e);
            }
        }
        return null;
    }

    private static Set<BookieId> convertToBookieAddresses(List<String> children) {
        HashSet<BookieId> newBookieAddrs = new HashSet<BookieId>();
        for (String bookieAddrString : children) {
            if ("readonly".equals(bookieAddrString)) continue;
            BookieId bookieAddr = BookieId.parse(bookieAddrString);
            newBookieAddrs.add(bookieAddr);
        }
        return newBookieAddrs;
    }

    @Override
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> resultFromCache = this.bookieServiceInfoCache.get(bookieId);
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", (Object)bookieId, resultFromCache);
        }
        if (resultFromCache != null) {
            return CompletableFuture.completedFuture(resultFromCache);
        }
        return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
        String asWritable = this.bookieRegistrationPath + "/" + bookieId;
        return this.bookieServiceInfoMetadataCache.get(asWritable).thenCompose(getResult -> {
            if (getResult.isPresent()) {
                Versioned<BookieServiceInfo> res = new Versioned<BookieServiceInfo>((BookieServiceInfo)getResult.get(), new LongVersion(-1L));
                log.info("Update BookieInfoCache (writable bookie) {} -> {}", (Object)bookieId, getResult.get());
                this.bookieServiceInfoCache.put(bookieId, res);
                return CompletableFuture.completedFuture(res);
            }
            return this.readBookieInfoAsReadonlyBookie(bookieId);
        });
    }

    final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
        String asReadonly = this.bookieReadonlyRegistrationPath + "/" + bookieId;
        return this.bookieServiceInfoMetadataCache.get(asReadonly).thenApply(getResultAsReadOnly -> {
            if (getResultAsReadOnly.isPresent()) {
                Versioned<BookieServiceInfo> res = new Versioned<BookieServiceInfo>((BookieServiceInfo)getResultAsReadOnly.get(), new LongVersion(-1L));
                log.info("Update BookieInfoCache (readonly bookie) {} -> {}", (Object)bookieId, getResultAsReadOnly.get());
                this.bookieServiceInfoCache.put(bookieId, res);
                return res;
            }
            throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
        });
    }
}

