/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.discover;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.collect.Sets;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.discover.BookieServiceInfo;
import dlshade.org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import dlshade.org.apache.bookkeeper.discover.RegistrationClient;
import dlshade.org.apache.bookkeeper.net.BookieId;
import dlshade.org.apache.bookkeeper.proto.DataFormats;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.bookkeeper.versioning.Version;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.WatchedEvent;
import dlshade.org.apache.zookeeper.Watcher;
import dlshade.org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKRegistrationClient
implements RegistrationClient {
    private static final Logger log = LoggerFactory.getLogger(ZKRegistrationClient.class);
    static final int ZK_CONNECT_BACKOFF_MS = 200;
    private final ZooKeeper zk;
    private final ScheduledExecutorService scheduler;
    private WatchTask watchWritableBookiesTask = null;
    private WatchTask watchReadOnlyBookiesTask = null;
    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache = new ConcurrentHashMap();
    private final Watcher bookieServiceInfoCacheInvalidation;
    private final boolean bookieAddressTracking;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;

    public ZKRegistrationClient(ZooKeeper zk, String ledgersRootPath, ScheduledExecutorService scheduler, boolean bookieAddressTracking) {
        this.zk = zk;
        this.scheduler = scheduler;
        this.bookieAddressTracking = bookieAddressTracking;
        this.bookieServiceInfoCacheInvalidation = bookieAddressTracking ? new BookieServiceInfoCacheInvalidationWatcher() : null;
        this.bookieRegistrationPath = ledgersRootPath + "/" + "available";
        this.bookieAllRegistrationPath = ledgersRootPath + "/" + "cookies";
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + "readonly";
    }

    @Override
    public void close() {
    }

    public boolean isBookieAddressTracking() {
        return this.bookieAddressTracking;
    }

    public ZooKeeper getZk() {
        return this.zk;
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return this.getChildren(this.bookieRegistrationPath, null);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        return this.getChildren(this.bookieAllRegistrationPath, null);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return this.getChildren(this.bookieReadonlyRegistrationPath, null);
    }

    @Override
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> resultFromCache = this.bookieServiceInfoCache.get(bookieId);
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", (Object)bookieId, resultFromCache);
        }
        if (resultFromCache != null) {
            return CompletableFuture.completedFuture(resultFromCache);
        }
        return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
        String pathAsWritable = this.bookieRegistrationPath + "/" + bookieId;
        String pathAsReadonly = this.bookieReadonlyRegistrationPath + "/" + bookieId;
        CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<Versioned<BookieServiceInfo>>();
        this.zk.getData(pathAsWritable, this.bookieServiceInfoCacheInvalidation, (rc, path, o, bytes, stat) -> {
            if (KeeperException.Code.OK.intValue() == rc) {
                try {
                    BookieServiceInfo bookieServiceInfo = ZKRegistrationClient.deserializeBookieServiceInfo(bookieId, bytes);
                    Versioned<BookieServiceInfo> result = new Versioned<BookieServiceInfo>(bookieServiceInfo, new LongVersion(stat.getCversion()));
                    log.info("Update BookieInfoCache (writable bookie) {} -> {}", (Object)bookieId, (Object)result.getValue());
                    this.bookieServiceInfoCache.put(bookieId, result);
                    promise.complete(result);
                }
                catch (IOException ex) {
                    log.error("Cannot update BookieInfo for ", (Throwable)ex);
                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path).initCause(ex));
                    return;
                }
            } else if (KeeperException.Code.NONODE.intValue() == rc) {
                this.zk.getData(pathAsReadonly, this.bookieServiceInfoCacheInvalidation, (rc2, path2, o2, bytes2, stat2) -> {
                    if (KeeperException.Code.OK.intValue() == rc2) {
                        try {
                            BookieServiceInfo bookieServiceInfo = ZKRegistrationClient.deserializeBookieServiceInfo(bookieId, bytes2);
                            Versioned<BookieServiceInfo> result = new Versioned<BookieServiceInfo>(bookieServiceInfo, new LongVersion(stat2.getCversion()));
                            log.info("Update BookieInfoCache (readonly bookie) {} -> {}", (Object)bookieId, (Object)result.getValue());
                            this.bookieServiceInfoCache.put(bookieId, result);
                            promise.complete(result);
                        }
                        catch (IOException ex) {
                            log.error("Cannot update BookieInfo for ", (Throwable)ex);
                            promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2).initCause(ex));
                            return;
                        }
                    } else {
                        promise.completeExceptionally(BKException.create(-3));
                    }
                }, null);
            } else {
                promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
            }
        }, null);
        return promise;
    }

    @VisibleForTesting
    static BookieServiceInfo deserializeBookieServiceInfo(BookieId bookieId, byte[] bookieServiceInfo) throws IOException {
        if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
            return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId.toString());
        }
        DataFormats.BookieServiceInfoFormat builder = DataFormats.BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
        BookieServiceInfo bsi = new BookieServiceInfo();
        List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream().map(e -> {
            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
            endpoint.setId(e.getId());
            endpoint.setPort(e.getPort());
            endpoint.setHost(e.getHost());
            endpoint.setProtocol(e.getProtocol());
            endpoint.setAuth(e.getAuthList());
            endpoint.setExtensions(e.getExtensionsList());
            return endpoint;
        }).collect(Collectors.toList());
        bsi.setEndpoints(endpoints);
        bsi.setProperties(builder.getPropertiesMap());
        return bsi;
    }

    private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String regPath, Watcher watcher) {
        CompletableFuture<Versioned<Set<BookieId>>> future = FutureUtils.createFuture();
        this.zk.getChildren(regPath, watcher, (rc, path, ctx, children, stat) -> {
            if (KeeperException.Code.OK.intValue() != rc) {
                BKException.ZKException zke = new BKException.ZKException(KeeperException.create(KeeperException.Code.get(rc), path));
                future.completeExceptionally(zke.fillInStackTrace());
                return;
            }
            LongVersion version = new LongVersion(stat.getCversion());
            HashSet<BookieId> bookies = ZKRegistrationClient.convertToBookieAddresses(children);
            ArrayList bookieInfoUpdated = new ArrayList(bookies.size());
            for (BookieId id : bookies) {
                if (this.bookieServiceInfoCache.containsKey(id)) continue;
                bookieInfoUpdated.add(this.readBookieServiceInfoAsync(id));
            }
            if (bookieInfoUpdated.isEmpty()) {
                future.complete(new Versioned<HashSet<BookieId>>(bookies, version));
            } else {
                FutureUtils.collect(bookieInfoUpdated).whenComplete((info, error) -> future.complete(new Versioned<Set>(bookies, version)));
            }
        }, null);
        return future;
    }

    @Override
    public synchronized CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener listener) {
        CompletionStage f;
        if (null == this.watchWritableBookiesTask) {
            f = new CompletableFuture();
            this.watchWritableBookiesTask = new WatchTask(this.bookieRegistrationPath, (CompletableFuture<Void>)f);
            f = f.whenComplete((value, cause) -> {
                if (null != cause) {
                    this.unwatchWritableBookies(listener);
                }
            });
        } else {
            f = this.watchWritableBookiesTask.firstRunFuture;
        }
        this.watchWritableBookiesTask.addListener(listener);
        if (this.watchWritableBookiesTask.getNumListeners() == 1) {
            this.watchWritableBookiesTask.watch();
        }
        return f;
    }

    @Override
    public synchronized void unwatchWritableBookies(RegistrationClient.RegistrationListener listener) {
        if (null == this.watchWritableBookiesTask) {
            return;
        }
        this.watchWritableBookiesTask.removeListener(listener);
        if (this.watchWritableBookiesTask.getNumListeners() == 0) {
            this.watchWritableBookiesTask.close();
            this.watchWritableBookiesTask = null;
        }
    }

    @Override
    public synchronized CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener listener) {
        CompletionStage f;
        if (null == this.watchReadOnlyBookiesTask) {
            f = new CompletableFuture();
            this.watchReadOnlyBookiesTask = new WatchTask(this.bookieReadonlyRegistrationPath, (CompletableFuture<Void>)f);
            f = f.whenComplete((value, cause) -> {
                if (null != cause) {
                    this.unwatchReadOnlyBookies(listener);
                }
            });
        } else {
            f = this.watchReadOnlyBookiesTask.firstRunFuture;
        }
        this.watchReadOnlyBookiesTask.addListener(listener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 1) {
            this.watchReadOnlyBookiesTask.watch();
        }
        return f;
    }

    @Override
    public synchronized void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener listener) {
        if (null == this.watchReadOnlyBookiesTask) {
            return;
        }
        this.watchReadOnlyBookiesTask.removeListener(listener);
        if (this.watchReadOnlyBookiesTask.getNumListeners() == 0) {
            this.watchReadOnlyBookiesTask.close();
            this.watchReadOnlyBookiesTask = null;
        }
    }

    private static HashSet<BookieId> convertToBookieAddresses(List<String> children) {
        HashSet<BookieId> newBookieAddrs = Sets.newHashSet();
        for (String bookieAddrString : children) {
            if ("readonly".equals(bookieAddrString)) continue;
            BookieId bookieAddr = BookieId.parse(bookieAddrString);
            newBookieAddrs.add(bookieAddr);
        }
        return newBookieAddrs;
    }

    private static BookieId stripBookieIdFromPath(String path) {
        if (path == null) {
            return null;
        }
        int slash = path.lastIndexOf(47);
        if (slash >= 0) {
            try {
                return BookieId.parse(path.substring(slash + 1));
            }
            catch (IllegalArgumentException e) {
                log.warn("Cannot decode bookieId from {}", (Object)path, (Object)e);
            }
        }
        return null;
    }

    WatchTask getWatchWritableBookiesTask() {
        return this.watchWritableBookiesTask;
    }

    WatchTask getWatchReadOnlyBookiesTask() {
        return this.watchReadOnlyBookiesTask;
    }

    private class BookieServiceInfoCacheInvalidationWatcher
    implements Watcher {
        private BookieServiceInfoCacheInvalidationWatcher() {
        }

        @Override
        public void process(WatchedEvent we) {
            if (log.isDebugEnabled()) {
                log.debug("zk event {} for {} state {}", new Object[]{we.getType(), we.getPath(), we.getState()});
            }
            if (we.getState() == Watcher.Event.KeeperState.Expired) {
                log.info("zk session expired, invalidating cache");
                ZKRegistrationClient.this.bookieServiceInfoCache.clear();
                return;
            }
            BookieId bookieId = ZKRegistrationClient.stripBookieIdFromPath(we.getPath());
            if (bookieId == null) {
                return;
            }
            switch (we.getType()) {
                case NodeDeleted: {
                    log.info("Invalidate cache for {}", (Object)bookieId);
                    ZKRegistrationClient.this.bookieServiceInfoCache.remove(bookieId);
                    break;
                }
                case NodeDataChanged: {
                    log.info("refresh cache for {}", (Object)bookieId);
                    ZKRegistrationClient.this.readBookieServiceInfoAsync(bookieId);
                    break;
                }
                default: {
                    if (!log.isDebugEnabled()) break;
                    log.debug("ignore cache event {} for {}", (Object)we.getType(), (Object)bookieId);
                }
            }
        }
    }

    class WatchTask
    implements Runnable,
    Watcher,
    BiConsumer<Versioned<Set<BookieId>>, Throwable>,
    AutoCloseable {
        private final String regPath;
        private final Set<RegistrationClient.RegistrationListener> listeners;
        private volatile boolean closed = false;
        private Set<BookieId> bookies = null;
        private Version version = Version.NEW;
        private final CompletableFuture<Void> firstRunFuture;

        WatchTask(String regPath, CompletableFuture<Void> firstRunFuture) {
            this.regPath = regPath;
            this.listeners = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
            this.firstRunFuture = firstRunFuture;
        }

        public int getNumListeners() {
            return this.listeners.size();
        }

        public boolean addListener(RegistrationClient.RegistrationListener listener) {
            if (this.listeners.add(listener) && null != this.bookies) {
                ZKRegistrationClient.this.scheduler.execute(() -> listener.onBookiesChanged(new Versioned<Set<BookieId>>(this.bookies, this.version)));
            }
            return true;
        }

        public boolean removeListener(RegistrationClient.RegistrationListener listener) {
            return this.listeners.remove(listener);
        }

        void watch() {
            this.scheduleWatchTask(0L);
        }

        private void scheduleWatchTask(long delayMs) {
            try {
                ZKRegistrationClient.this.scheduler.schedule(this, delayMs, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException ree) {
                log.warn("Failed to schedule watch bookies task", (Throwable)ree);
            }
        }

        @Override
        public void run() {
            if (this.isClosed()) {
                return;
            }
            ZKRegistrationClient.this.getChildren(this.regPath, this).whenCompleteAsync((BiConsumer)this, (Executor)ZKRegistrationClient.this.scheduler);
        }

        @Override
        public void accept(Versioned<Set<BookieId>> bookieSet, Throwable throwable) {
            if (throwable != null) {
                if (this.firstRunFuture.isDone()) {
                    this.scheduleWatchTask(200L);
                } else {
                    this.firstRunFuture.completeExceptionally(throwable);
                }
                return;
            }
            if (this.version.compare(bookieSet.getVersion()) == Version.Occurred.BEFORE) {
                this.version = bookieSet.getVersion();
                this.bookies = bookieSet.getValue();
                if (!this.listeners.isEmpty()) {
                    for (RegistrationClient.RegistrationListener listener : this.listeners) {
                        listener.onBookiesChanged(bookieSet);
                    }
                }
            }
            FutureUtils.complete(this.firstRunFuture, null);
        }

        @Override
        public void process(WatchedEvent event) {
            if (Watcher.Event.EventType.None == event.getType()) {
                if (Watcher.Event.KeeperState.Expired == event.getState()) {
                    this.scheduleWatchTask(200L);
                }
                return;
            }
            this.scheduleWatchTask(0L);
        }

        boolean isClosed() {
            return this.closed;
        }

        @Override
        public void close() {
            this.closed = true;
        }
    }
}

