/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.zookeeper;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.spi.cluster.zookeeper.impl.ConfigUtil;
import io.vertx.spi.cluster.zookeeper.impl.RetryPolicyHelper;
import io.vertx.spi.cluster.zookeeper.impl.SubsMapHelper;
import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap;
import io.vertx.spi.cluster.zookeeper.impl.ZKCounter;
import io.vertx.spi.cluster.zookeeper.impl.ZKLock;
import io.vertx.spi.cluster.zookeeper.impl.ZKSyncMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;

public class ZookeeperClusterManager
implements ClusterManager,
PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(ZookeeperClusterManager.class);
    private VertxInternal vertx;
    private NodeSelector nodeSelector;
    private NodeListener nodeListener;
    private PathChildrenCache clusterNodes;
    private volatile boolean active;
    private volatile boolean joined;
    private String nodeId;
    private NodeInfo nodeInfo;
    private CuratorFramework curator;
    private boolean customCuratorCluster;
    private RetryPolicy retryPolicy;
    private SubsMapHelper subsMapHelper;
    private final Map<String, NodeInfo> localNodeInfo = new ConcurrentHashMap<String, NodeInfo>();
    private final Map<String, ZKLock> locks = new ConcurrentHashMap<String, ZKLock>();
    private final Map<String, AsyncMap<?, ?>> asyncMapCache = new ConcurrentHashMap();
    private JsonObject conf = new JsonObject();
    private static final String ZK_PATH_LOCKS = "/locks/";
    private static final String ZK_PATH_CLUSTER_NODE = "/cluster/nodes/";
    private static final String ZK_PATH_CLUSTER_NODE_WITHOUT_SLASH = "/cluster/nodes";
    private ExecutorService lockReleaseExec;
    private Function<String, String> resolveNodeId = path -> {
        String[] pathArr = path.split("\\/");
        return pathArr[pathArr.length - 1];
    };

    public ZookeeperClusterManager() {
        this.conf = ConfigUtil.loadConfig(null);
    }

    public ZookeeperClusterManager(CuratorFramework curator) {
        this(curator, UUID.randomUUID().toString());
    }

    public ZookeeperClusterManager(String resourceLocation) {
        this.conf = ConfigUtil.loadConfig(resourceLocation);
    }

    public ZookeeperClusterManager(CuratorFramework curator, String nodeId) {
        Objects.requireNonNull(curator, "The Curator instance cannot be null.");
        Objects.requireNonNull(nodeId, "The nodeId cannot be null.");
        this.curator = curator;
        this.nodeId = nodeId;
        this.customCuratorCluster = true;
    }

    public ZookeeperClusterManager(JsonObject config) {
        this.conf = config;
    }

    public void setConfig(JsonObject conf) {
        this.conf = conf;
    }

    public JsonObject getConfig() {
        return this.conf;
    }

    public CuratorFramework getCuratorFramework() {
        return this.curator;
    }

    public void init(Vertx vertx, NodeSelector nodeSelector) {
        this.vertx = (VertxInternal)vertx;
        this.nodeSelector = nodeSelector;
    }

    public <K, V> void getAsyncMap(String name, Promise<AsyncMap<K, V>> promise) {
        this.vertx.executeBlocking(prom -> {
            AsyncMap zkAsyncMap = this.asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap((Vertx)this.vertx, this.curator, name));
            prom.complete((Object)zkAsyncMap);
        }, promise);
    }

    public <K, V> Map<K, V> getSyncMap(String name) {
        return new ZKSyncMap(this.curator, name);
    }

    public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise) {
        this.vertx.executeBlocking(prom -> {
            ZKLock lock = this.locks.get(name);
            if (lock == null) {
                InterProcessSemaphoreMutex mutexLock = new InterProcessSemaphoreMutex(this.curator, ZK_PATH_LOCKS + name);
                lock = new ZKLock(mutexLock, this.lockReleaseExec);
            }
            try {
                if (!lock.getLock().acquire(timeout, TimeUnit.MILLISECONDS)) {
                    throw new VertxException("Timed out waiting to get lock " + name);
                }
                this.locks.putIfAbsent(name, lock);
                prom.complete((Object)lock);
            }
            catch (Exception e) {
                throw new VertxException("get lock exception", (Throwable)e);
            }
        }, false, promise);
    }

    public void getCounter(String name, Promise<Counter> promise) {
        this.vertx.executeBlocking(future -> {
            try {
                Objects.requireNonNull(name);
                future.complete((Object)new ZKCounter(this.vertx, this.curator, name, this.retryPolicy));
            }
            catch (Exception e) {
                future.fail((Throwable)new VertxException((Throwable)e));
            }
        }, promise);
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public List<String> getNodes() {
        return this.clusterNodes.getCurrentData().stream().map(childData -> this.resolveNodeId.apply(childData.getPath())).collect(Collectors.toList());
    }

    public void nodeListener(NodeListener listener) {
        this.nodeListener = listener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setNodeInfo(NodeInfo nodeInfo, Promise<Void> promise) {
        ZookeeperClusterManager zookeeperClusterManager = this;
        synchronized (zookeeperClusterManager) {
            this.nodeInfo = nodeInfo;
        }
        try {
            Buffer buffer = Buffer.buffer();
            nodeInfo.writeToBuffer(buffer);
            ((ErrorListenerPathAndBytesable)((ACLBackgroundPathAndBytesable)this.curator.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).inBackground((c, e) -> {
                if (e.getType() == CuratorEventType.SET_DATA || e.getType() == CuratorEventType.CREATE) {
                    this.vertx.runOnContext(Avoid -> {
                        this.localNodeInfo.put(this.nodeId, nodeInfo);
                        promise.complete();
                    });
                }
            })).withUnhandledErrorListener((arg_0, arg_1) -> ((Logger)log).error(arg_0, arg_1)).forPath(ZK_PATH_CLUSTER_NODE + this.nodeId, buffer.getBytes());
        }
        catch (Exception e2) {
            log.error((Object)"create node failed.", (Throwable)e2);
        }
    }

    public synchronized NodeInfo getNodeInfo() {
        return this.nodeInfo;
    }

    public void getNodeInfo(String nodeId, Promise<NodeInfo> promise) {
        this.vertx.executeBlocking(prom -> prom.complete((Object)Optional.ofNullable(this.clusterNodes.getCurrentData(ZK_PATH_CLUSTER_NODE + nodeId)).map(childData -> {
            Buffer buffer = Buffer.buffer((byte[])childData.getData());
            NodeInfo nodeInfo = new NodeInfo();
            nodeInfo.readFromBuffer(0, buffer);
            return nodeInfo;
        }).orElseThrow(() -> new VertxException("Not a member of the cluster"))), false, promise);
    }

    private void addLocalNodeId() throws VertxException {
        this.clusterNodes = new PathChildrenCache(this.curator, ZK_PATH_CLUSTER_NODE_WITHOUT_SLASH, true);
        this.clusterNodes.getListenable().addListener((Object)this);
        try {
            this.clusterNodes.start(PathChildrenCache.StartMode.NORMAL);
            this.createThisNode();
            this.joined = true;
            this.subsMapHelper = new SubsMapHelper(this.curator, this.vertx, this.nodeSelector, this.nodeId);
        }
        catch (Exception e) {
            throw new VertxException((Throwable)e);
        }
    }

    private void createThisNode() throws Exception {
        try {
            ((ACLBackgroundPathAndBytesable)this.curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(ZK_PATH_CLUSTER_NODE + this.nodeId, this.nodeId.getBytes());
        }
        catch (KeeperException.NodeExistsException e) {
            log.info((Object)("node:" + this.nodeId + " have created successful."));
        }
    }

    public void join(Promise<Void> promise) {
        this.vertx.executeBlocking(prom -> {
            if (!this.active) {
                this.active = true;
                this.lockReleaseExec = Executors.newCachedThreadPool(r -> new Thread(r, "vertx-zookeeper-service-release-lock-thread"));
                if (this.customCuratorCluster) {
                    try {
                        this.addLocalNodeId();
                        prom.complete();
                    }
                    catch (VertxException e) {
                        prom.fail((Throwable)e);
                    }
                    return;
                }
                if (this.curator == null) {
                    this.retryPolicy = RetryPolicyHelper.createRetryPolicy(this.conf.getJsonObject("retry", new JsonObject()));
                    String hosts = System.getProperty("vertx.zookeeper.hosts");
                    if (hosts == null) {
                        hosts = this.conf.getString("zookeeperHosts", "127.0.0.1");
                    }
                    log.info((Object)("Zookeeper hosts set to " + hosts));
                    this.curator = CuratorFrameworkFactory.builder().connectString(hosts).namespace(this.conf.getString("rootPath", "io.vertx")).sessionTimeoutMs(this.conf.getInteger("sessionTimeout", Integer.valueOf(20000)).intValue()).connectionTimeoutMs(this.conf.getInteger("connectTimeout", Integer.valueOf(3000)).intValue()).retryPolicy(this.retryPolicy).build();
                }
                this.curator.start();
                while (this.curator.getState() != CuratorFrameworkState.STARTED) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        if (this.curator.getState() == CuratorFrameworkState.STARTED) continue;
                        prom.fail("zookeeper client being interrupted while starting.");
                    }
                }
                this.nodeId = UUID.randomUUID().toString();
                try {
                    this.addLocalNodeId();
                    prom.complete();
                }
                catch (Exception e) {
                    prom.fail((Throwable)e);
                }
            } else {
                prom.complete();
            }
        }, promise);
    }

    public void leave(Promise<Void> promise) {
        this.vertx.executeBlocking(prom -> {
            ZookeeperClusterManager zookeeperClusterManager = this;
            synchronized (zookeeperClusterManager) {
                if (this.active) {
                    this.active = false;
                    this.joined = false;
                    this.lockReleaseExec.shutdown();
                    try {
                        this.clusterNodes.close();
                        this.subsMapHelper.close();
                        this.curator.close();
                    }
                    catch (Exception e) {
                        log.warn((Object)"zookeeper close exception.", (Throwable)e);
                    }
                    finally {
                        prom.complete();
                    }
                } else {
                    prom.complete();
                }
            }
        }, promise);
    }

    public boolean isActive() {
        return this.active;
    }

    public void addRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
        this.subsMapHelper.put(address, registrationInfo, promise);
    }

    public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
        this.subsMapHelper.remove(address, registrationInfo, promise);
    }

    public void getRegistrations(String address, Promise<List<RegistrationInfo>> promise) {
        this.vertx.executeBlocking(prom -> prom.complete(this.subsMapHelper.get(address)), false, promise);
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if (!this.active) {
            return;
        }
        switch (event.getType()) {
            case CHILD_ADDED: {
                try {
                    if (this.nodeListener == null || client.getState() == CuratorFrameworkState.STOPPED) break;
                    this.nodeListener.nodeAdded(this.resolveNodeId.apply(event.getData().getPath()));
                }
                catch (Throwable t) {
                    log.error((Object)"Failed to handle memberAdded", t);
                }
                break;
            }
            case CHILD_REMOVED: {
                try {
                    if (this.nodeListener == null || client.getState() == CuratorFrameworkState.STOPPED) break;
                    this.nodeListener.nodeLeft(this.resolveNodeId.apply(event.getData().getPath()));
                }
                catch (Throwable t) {
                    log.warn((Object)"Failed to handle memberRemoved", t);
                }
                break;
            }
            case CHILD_UPDATED: {
                break;
            }
            case CONNECTION_RECONNECTED: {
                if (!this.joined) break;
                this.createThisNode();
                ArrayList<Future> futures = new ArrayList<Future>();
                for (Map.Entry<String, NodeInfo> entry : this.localNodeInfo.entrySet()) {
                    Promise promise = Promise.promise();
                    this.setNodeInfo(entry.getValue(), (Promise<Void>)promise);
                    futures.add(promise.future());
                }
                CompositeFuture.all(futures).onComplete(ar -> {
                    if (ar.failed()) {
                        log.error((Object)"recover node info failed.", ar.cause());
                    }
                });
                break;
            }
            case CONNECTION_SUSPENDED: {
                this.locks.values().forEach(ZKLock::release);
                break;
            }
            case CONNECTION_LOST: {
                this.joined = false;
                this.locks.values().forEach(ZKLock::release);
                this.locks.clear();
            }
        }
    }
}

