/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl;

import java.io.File;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AddWatchMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
import org.apache.pulsar.metadata.impl.ZKSessionWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKMetadataStore
extends AbstractMetadataStore
implements MetadataStoreExtended,
MetadataStoreLifecycle {
    private static final Logger log = LoggerFactory.getLogger(ZKMetadataStore.class);
    private final String metadataURL;
    private final MetadataStoreConfig metadataStoreConfig;
    private final boolean isZkManaged;
    private final ZooKeeper zkc;
    private Optional<ZKSessionWatcher> sessionWatcher;

    public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws MetadataStoreException {
        try {
            this.metadataURL = metadataURL;
            this.metadataStoreConfig = metadataStoreConfig;
            this.isZkManaged = true;
            this.zkc = PulsarZooKeeperClient.newBuilder().connectString(metadataURL).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100L, 60000L, Integer.MAX_VALUE)).allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()).sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).watchers(Collections.singleton(event -> {
                if (this.sessionWatcher != null) {
                    this.sessionWatcher.ifPresent(sw -> sw.process(event));
                }
            })).build();
            this.zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
            this.sessionWatcher = enableSessionWatcher ? Optional.of(new ZKSessionWatcher(this.zkc, this::receivedSessionEvent)) : Optional.empty();
        }
        catch (Throwable t) {
            throw new MetadataStoreException(t);
        }
    }

    @VisibleForTesting
    public ZKMetadataStore(ZooKeeper zkc) {
        this.metadataURL = null;
        this.metadataStoreConfig = null;
        this.isZkManaged = false;
        this.zkc = zkc;
        this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
        zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
    }

    @Override
    protected void receivedSessionEvent(SessionEvent event) {
        if (event == SessionEvent.SessionReestablished) {
            this.zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE, (rc, path, ctx) -> {
                if (rc == KeeperException.Code.OK.intValue()) {
                    super.receivedSessionEvent(event);
                } else {
                    log.error("Failed to recreate persistent watch on ZooKeeper: {}", (Object)KeeperException.Code.get(rc));
                    this.sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid);
                    if (this.zkc instanceof PulsarZooKeeperClient) {
                        ((PulsarZooKeeperClient)this.zkc).process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
                    }
                }
            }, null);
        } else {
            super.receivedSessionEvent(event);
        }
    }

    @Override
    public CompletableFuture<Optional<GetResult>> get(String path) {
        CompletableFuture<Optional<GetResult>> future = new CompletableFuture<Optional<GetResult>>();
        try {
            this.zkc.getData(path, null, (rc, path1, ctx, data, stat) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(Optional.of(new GetResult(data, this.getStat(path1, stat))));
                } else if (code == KeeperException.Code.NONODE) {
                    ((CompletableFuture)this.existsFromStore(path).thenAccept(exists -> {
                        if (exists.booleanValue()) {
                            ((CompletableFuture)this.get(path).thenAccept(c -> future.complete((Optional<GetResult>)c))).exceptionally(ex -> {
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        } else {
                            future.complete(Optional.empty());
                        }
                    })).exceptionally(ex -> {
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                    future.complete(Optional.empty());
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
        CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        try {
            this.zkc.getChildren(path, null, (rc, path1, ctx, children) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    Collections.sort(children);
                    future.complete(children);
                } else if (code == KeeperException.Code.NONODE) {
                    ((CompletableFuture)this.existsFromStore(path).thenAccept(exists -> {
                        if (exists.booleanValue()) {
                            ((CompletableFuture)this.getChildrenFromStore(path).thenAccept(c -> future.complete((List<String>)c))).exceptionally(ex -> {
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        } else {
                            future.complete(Collections.emptyList());
                        }
                    })).exceptionally(ex -> {
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<Boolean> existsFromStore(String path) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        try {
            this.zkc.exists(path, null, (rc, path1, ctx, stat) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(true);
                } else if (code == KeeperException.Code.NONODE) {
                    future.complete(false);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), future);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> optExpectedVersion) {
        return this.put(path, value, optExpectedVersion, EnumSet.noneOf(CreateOption.class));
    }

    @Override
    protected CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
        boolean hasVersion = optExpectedVersion.isPresent();
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        CompletableFuture<Stat> future = new CompletableFuture<Stat>();
        try {
            if (hasVersion && expectedVersion == -1) {
                CreateMode createMode = ZKMetadataStore.getCreateMode(options);
                ZKMetadataStore.asyncCreateFullPathOptimistic(this.zkc, path, value, createMode, (rc, path1, ctx, name) -> this.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(new Stat(name, 0L, 0L, 0L, createMode.isEphemeral(), true));
                    } else if (code == KeeperException.Code.NODEEXISTS) {
                        future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }, future));
            } else {
                this.zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> this.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(this.getStat(path1, stat));
                    } else if (code == KeeperException.Code.NONODE) {
                        if (hasVersion) {
                            future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                        } else {
                            ((CompletableFuture)this.put(path, value, Optional.of(-1L)).thenAccept(s -> future.complete((Stat)s))).exceptionally(ex -> {
                                future.completeExceptionally(ex.getCause());
                                return null;
                            });
                        }
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }, future), null);
            }
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            this.zkc.delete(path, expectedVersion, (rc, path1, ctx) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(null);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
        return future;
    }

    @Override
    public void close() throws Exception {
        if (this.isZkManaged) {
            this.zkc.close();
        }
        if (this.sessionWatcher.isPresent()) {
            this.sessionWatcher.get().close();
        }
        super.close();
    }

    private Stat getStat(String path, org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat zkStat) {
        return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), zkStat.getEphemeralOwner() != -1L, zkStat.getEphemeralOwner() == this.zkc.getSessionId());
    }

    private static MetadataStoreException getException(KeeperException.Code code, String path) {
        KeeperException ex = KeeperException.create(code, path);
        switch (code) {
            case BADVERSION: {
                return new MetadataStoreException.BadVersionException(ex);
            }
            case NONODE: {
                return new MetadataStoreException.NotFoundException(ex);
            }
            case NODEEXISTS: {
                return new MetadataStoreException.AlreadyExistsException(ex);
            }
        }
        return new MetadataStoreException(ex);
    }

    private void handleWatchEvent(WatchedEvent event) {
        NotificationType type;
        String path;
        if (log.isDebugEnabled()) {
            log.debug("Received ZK watch : {}", (Object)event);
        }
        if ((path = event.getPath()) == null) {
            return;
        }
        String parent = ZKMetadataStore.parent(path);
        Notification childrenChangedNotification = null;
        switch (event.getType()) {
            case NodeCreated: {
                type = NotificationType.Created;
                if (parent == null) break;
                childrenChangedNotification = new Notification(NotificationType.ChildrenChanged, parent);
                break;
            }
            case NodeDataChanged: {
                type = NotificationType.Modified;
                break;
            }
            case NodeChildrenChanged: {
                type = NotificationType.ChildrenChanged;
                break;
            }
            case NodeDeleted: {
                type = NotificationType.Deleted;
                if (parent == null) break;
                childrenChangedNotification = new Notification(NotificationType.ChildrenChanged, parent);
                break;
            }
            default: {
                return;
            }
        }
        this.receivedNotification(new Notification(type, event.getPath()));
        if (childrenChangedNotification != null) {
            this.receivedNotification(childrenChangedNotification);
        }
    }

    private static CreateMode getCreateMode(EnumSet<CreateOption> options) {
        if (options.contains((Object)CreateOption.Ephemeral)) {
            if (options.contains((Object)CreateOption.Sequential)) {
                return CreateMode.EPHEMERAL_SEQUENTIAL;
            }
            return CreateMode.EPHEMERAL;
        }
        if (options.contains((Object)CreateOption.Sequential)) {
            return CreateMode.PERSISTENT_SEQUENTIAL;
        }
        return CreateMode.PERSISTENT;
    }

    public long getZkSessionId() {
        return this.zkc.getSessionId();
    }

    public ZooKeeper getZkClient() {
        return this.zkc;
    }

    @Override
    public CompletableFuture<Void> initializeCluster() {
        if (this.metadataURL == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataURL is not set"));
        }
        if (this.metadataStoreConfig == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataStoreConfig is not set"));
        }
        int chrootIndex = this.metadataURL.indexOf("/");
        if (chrootIndex > 0) {
            String chrootPath = this.metadataURL.substring(chrootIndex);
            String zkConnectForChrootCreation = this.metadataURL.substring(0, chrootIndex);
            try (PulsarZooKeeperClient chrootZk = PulsarZooKeeperClient.newBuilder().connectString(zkConnectForChrootCreation).sessionTimeoutMs(this.metadataStoreConfig.getSessionTimeoutMillis()).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(this.metadataStoreConfig.getSessionTimeoutMillis(), this.metadataStoreConfig.getSessionTimeoutMillis(), 0)).build();){
                if (((ZooKeeper)chrootZk).exists(chrootPath, false) == null) {
                    ZKMetadataStore.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], CreateMode.PERSISTENT);
                    log.info("Created zookeeper chroot path {} successfully", (Object)chrootPath);
                }
            }
            catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    private static void asyncCreateFullPathOptimistic(ZooKeeper zk, String originalPath, byte[] data, CreateMode createMode, AsyncCallback.StringCallback callback) {
        zk.create(originalPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode, (rc, path, ctx, name) -> {
            if (rc != KeeperException.Code.NONODE.intValue()) {
                callback.processResult(rc, path, ctx, name);
            } else {
                String parent = new File(originalPath).getParent().replace("\\", "/");
                ZKMetadataStore.asyncCreateFullPathOptimistic(zk, parent, new byte[0], CreateMode.CONTAINER, (rc1, path1, ctx1, name1) -> {
                    if (rc1 != KeeperException.Code.OK.intValue() && rc1 != KeeperException.Code.NODEEXISTS.intValue()) {
                        callback.processResult(rc1, path1, ctx1, name1);
                    } else {
                        ZKMetadataStore.asyncCreateFullPathOptimistic(zk, originalPath, data, createMode, callback);
                    }
                });
            }
        }, null);
    }

    private static void createFullPathOptimistic(ZooKeeper zkc, String path, byte[] data, CreateMode createMode) throws KeeperException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
        ZKMetadataStore.asyncCreateFullPathOptimistic(zkc, path, data, createMode, (rc2, path1, ctx, name) -> {
            rc.set(rc2);
            latch.countDown();
        });
        latch.await();
        if (rc.get() != KeeperException.Code.OK.intValue()) {
            throw KeeperException.create(KeeperException.Code.get(rc.get()));
        }
    }
}

