/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl;

import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.Watcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.ZKSessionWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKMetadataStore
extends AbstractMetadataStore
implements MetadataStoreExtended,
Watcher,
MetadataStoreLifecycle {
    private static final Logger log = LoggerFactory.getLogger(ZKMetadataStore.class);
    private final String metadataURL;
    private final MetadataStoreConfig metadataStoreConfig;
    private final boolean isZkManaged;
    private final ZooKeeper zkc;
    private ZKSessionWatcher sessionWatcher;

    public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
        try {
            this.metadataURL = metadataURL;
            this.metadataStoreConfig = metadataStoreConfig;
            this.isZkManaged = true;
            this.zkc = ZooKeeperClient.newBuilder().connectString(metadataURL).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100L, 60000L, Integer.MAX_VALUE)).allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()).sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).watchers(Collections.singleton(event -> {
                if (this.sessionWatcher != null) {
                    this.sessionWatcher.process(event);
                }
            })).build();
            this.sessionWatcher = new ZKSessionWatcher(this.zkc, this::receivedSessionEvent);
        }
        catch (Throwable t) {
            throw new MetadataStoreException(t);
        }
    }

    @VisibleForTesting
    public ZKMetadataStore(ZooKeeper zkc) {
        this.metadataURL = null;
        this.metadataStoreConfig = null;
        this.isZkManaged = false;
        this.zkc = zkc;
        this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
    }

    @Override
    public CompletableFuture<Optional<GetResult>> get(String path) {
        CompletableFuture<Optional<GetResult>> future = new CompletableFuture<Optional<GetResult>>();
        this.getInternal(path, future);
        return future;
    }

    private void getInternal(String path, CompletableFuture<Optional<GetResult>> future) {
        try {
            this.zkc.getData(path, this, (rc, path1, ctx, data, stat) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(Optional.of(new GetResult(data, this.getStat(path1, stat))));
                } else if (code == KeeperException.Code.NONODE) {
                    ((CompletableFuture)this.existsFromStore(path).thenAccept(exists -> {
                        if (exists.booleanValue()) {
                            ((CompletableFuture)this.get(path).thenAccept(c -> future.complete((Optional<GetResult>)c))).exceptionally(ex -> {
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        } else {
                            future.complete(Optional.empty());
                        }
                    })).exceptionally(ex -> {
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                    future.complete(Optional.empty());
                } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                    log.warn("Zookeeper connection loss, get {}, retry after 100ms", (Object)path);
                    this.executor.schedule(() -> this.getInternal(path, future), 100L, TimeUnit.MILLISECONDS);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
    }

    @Override
    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
        CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        this.getChildrenFromStoreInternal(path, future);
        return future;
    }

    private void getChildrenFromStoreInternal(String path, CompletableFuture<List<String>> future) {
        try {
            this.zkc.getChildren(path, (Watcher)this, (rc, path1, ctx, children) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    Collections.sort(children);
                    future.complete(children);
                } else if (code == KeeperException.Code.NONODE) {
                    ((CompletableFuture)this.existsFromStore(path).thenAccept(exists -> {
                        if (exists.booleanValue()) {
                            ((CompletableFuture)this.getChildrenFromStore(path).thenAccept(c -> future.complete((List<String>)c))).exceptionally(ex -> {
                                future.completeExceptionally((Throwable)ex);
                                return null;
                            });
                        } else {
                            future.complete(Collections.emptyList());
                        }
                    })).exceptionally(ex -> {
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                    this.executor.schedule(() -> this.getChildrenFromStoreInternal(path, future), 100L, TimeUnit.MILLISECONDS);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
    }

    @Override
    public CompletableFuture<Boolean> existsFromStore(String path) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.existsFromStoreInternal(path, future);
        return future;
    }

    private void existsFromStoreInternal(String path, CompletableFuture<Boolean> future) {
        try {
            this.zkc.exists(path, this, (rc, path1, ctx, stat) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(true);
                } else if (code == KeeperException.Code.NONODE) {
                    future.complete(false);
                } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                    log.warn("Zookeeper connection loss, existsFromStore {}, retry after 100ms", (Object)path);
                    this.executor.schedule(() -> this.existsFromStoreInternal(path, future), 100L, TimeUnit.MILLISECONDS);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), future);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
    }

    @Override
    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> optExpectedVersion) {
        return this.put(path, value, optExpectedVersion, EnumSet.noneOf(CreateOption.class));
    }

    @Override
    public CompletableFuture<Stat> storePut(String path, byte[] value, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
        CompletableFuture<Stat> future = new CompletableFuture<Stat>();
        this.storePutInternal(path, value, optExpectedVersion, options, future);
        return future;
    }

    private void storePutInternal(String path, byte[] value, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options, CompletableFuture<Stat> future) {
        boolean hasVersion = optExpectedVersion.isPresent();
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        try {
            if (hasVersion && expectedVersion == -1) {
                CreateMode createMode = ZKMetadataStore.getCreateMode(options);
                ZkUtils.asyncCreateFullPathOptimistic(this.zkc, path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode, (rc, path1, ctx, name) -> this.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(new Stat(name, 0L, 0L, 0L, createMode.isEphemeral(), true));
                    } else if (code == KeeperException.Code.NODEEXISTS) {
                        future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                    } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                        log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", (Object)path);
                        this.executor.schedule(() -> this.storePutInternal(path, value, optExpectedVersion, options, future), 100L, TimeUnit.MILLISECONDS);
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }, future), null);
            } else {
                this.zkc.setData(path, value, expectedVersion, (rc, path1, ctx, stat) -> this.execute(() -> {
                    KeeperException.Code code = KeeperException.Code.get(rc);
                    if (code == KeeperException.Code.OK) {
                        future.complete(this.getStat(path1, stat));
                    } else if (code == KeeperException.Code.NONODE) {
                        if (hasVersion) {
                            future.completeExceptionally(ZKMetadataStore.getException(KeeperException.Code.BADVERSION, path));
                        } else {
                            ((CompletableFuture)this.put(path, value, Optional.of(-1L)).thenAccept(s -> future.complete((Stat)s))).exceptionally(ex -> {
                                future.completeExceptionally(ex.getCause());
                                return null;
                            });
                        }
                    } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                        log.warn("Zookeeper connection loss, storePut {}, retry after 100ms", (Object)path);
                        this.executor.schedule(() -> this.storePutInternal(path, value, optExpectedVersion, options, future), 100L, TimeUnit.MILLISECONDS);
                    } else {
                        future.completeExceptionally(ZKMetadataStore.getException(code, path));
                    }
                }, future), null);
            }
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
    }

    @Override
    public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.storeDeleteInternal(path, optExpectedVersion, future);
        return future;
    }

    private void storeDeleteInternal(String path, Optional<Long> optExpectedVersion, CompletableFuture<Void> future) {
        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
        try {
            this.zkc.delete(path, expectedVersion, (rc, path1, ctx) -> this.execute(() -> {
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    future.complete(null);
                } else if (code == KeeperException.Code.CONNECTIONLOSS) {
                    log.warn("Zookeeper connection loss, storeDelete {}, retry after 100ms", (Object)path);
                    this.executor.schedule(() -> this.storeDeleteInternal(path, optExpectedVersion, future), 100L, TimeUnit.MILLISECONDS);
                } else {
                    future.completeExceptionally(ZKMetadataStore.getException(code, path));
                }
            }, future), null);
        }
        catch (Throwable t) {
            future.completeExceptionally(new MetadataStoreException(t));
        }
    }

    @Override
    public void close() throws Exception {
        if (this.isZkManaged) {
            this.zkc.close();
        }
        this.sessionWatcher.close();
        super.close();
    }

    private Stat getStat(String path, org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.data.Stat zkStat) {
        return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), zkStat.getEphemeralOwner() != -1L, zkStat.getEphemeralOwner() == this.zkc.getSessionId());
    }

    private static MetadataStoreException getException(KeeperException.Code code, String path) {
        KeeperException ex = KeeperException.create(code, path);
        switch (code) {
            case BADVERSION: {
                return new MetadataStoreException.BadVersionException(ex);
            }
            case NONODE: {
                return new MetadataStoreException.NotFoundException(ex);
            }
            case NODEEXISTS: {
                return new MetadataStoreException.AlreadyExistsException(ex);
            }
        }
        return new MetadataStoreException(ex);
    }

    @Override
    public void process(WatchedEvent event) {
        NotificationType type;
        String path;
        if (log.isDebugEnabled()) {
            log.debug("Received ZK watch : {}", (Object)event);
        }
        if ((path = event.getPath()) == null) {
            return;
        }
        switch (event.getType()) {
            case NodeCreated: {
                type = NotificationType.Created;
                break;
            }
            case NodeDataChanged: {
                type = NotificationType.Modified;
                break;
            }
            case NodeChildrenChanged: {
                type = NotificationType.ChildrenChanged;
                break;
            }
            case NodeDeleted: {
                type = NotificationType.Deleted;
                break;
            }
            default: {
                return;
            }
        }
        this.receivedNotification(new Notification(type, event.getPath()));
    }

    private static CreateMode getCreateMode(EnumSet<CreateOption> options) {
        if (options.contains((Object)CreateOption.Ephemeral)) {
            if (options.contains((Object)CreateOption.Sequential)) {
                return CreateMode.EPHEMERAL_SEQUENTIAL;
            }
            return CreateMode.EPHEMERAL;
        }
        if (options.contains((Object)CreateOption.Sequential)) {
            return CreateMode.PERSISTENT_SEQUENTIAL;
        }
        return CreateMode.PERSISTENT;
    }

    public long getZkSessionId() {
        return this.zkc.getSessionId();
    }

    @Override
    public CompletableFuture<Void> initializeCluster() {
        if (this.metadataURL == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataURL is not set"));
        }
        if (this.metadataStoreConfig == null) {
            return FutureUtil.failedFuture(new MetadataStoreException("metadataStoreConfig is not set"));
        }
        int chrootIndex = this.metadataURL.indexOf("/");
        if (chrootIndex > 0) {
            String chrootPath = this.metadataURL.substring(chrootIndex);
            String zkConnectForChrootCreation = this.metadataURL.substring(0, chrootIndex);
            try (ZooKeeperClient chrootZk = ZooKeeperClient.newBuilder().connectString(zkConnectForChrootCreation).sessionTimeoutMs(this.metadataStoreConfig.getSessionTimeoutMillis()).connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(this.metadataStoreConfig.getSessionTimeoutMillis(), this.metadataStoreConfig.getSessionTimeoutMillis(), 0)).build();){
                if (((ZooKeeper)chrootZk).exists(chrootPath, false) == null) {
                    ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    log.info("Created zookeeper chroot path {} successfully", (Object)chrootPath);
                }
            }
            catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }
}

