/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.zookeeper;

import com.networknt.exception.FrameworkException;
import com.networknt.registry.URL;
import com.networknt.registry.URLImpl;
import com.networknt.registry.support.command.CommandFailbackRegistry;
import com.networknt.registry.support.command.CommandListener;
import com.networknt.registry.support.command.ServiceListener;
import com.networknt.status.Status;
import com.networknt.utility.ConcurrentHashSet;
import com.networknt.zookeeper.ZkNodeType;
import com.networknt.zookeeper.ZkUtils;
import com.networknt.zookeeper.client.ZooKeeperClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperRegistry
extends CommandFailbackRegistry {
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperRegistry.class);
    private static final String SUBSCRIBE_ZOOKEEPER_SERVICE_ERROR = "ERR10027";
    private static final String SUBSCRIBE_ZOOKEEPER_COMMAND_ERROR = "ERR10028";
    private static final String UNSUBSCRIBE_ZOOKEEPER_SERVICE_ERROR = "ERR10029";
    private static final String UNSUBSCRIBE_ZOOKEEPER_COMMAND_ERROR = "ERR10030";
    private static final String DISCOVER_ZOOKEEPER_SERVICE_ERROR = "ERR10031";
    private static final String DISCOVER_ZOOKEEPER_COMMAND_ERROR = "ERR10032";
    private static final String REGISTER_ZOOKEEPER_ERROR = "ERR10033";
    private static final String UNREGISTER_ZOOKEEPER_ERROR = "ERR10034";
    private ZooKeeperClient client;
    private Set<URL> availableServices = new ConcurrentHashSet<URL>();
    private ConcurrentHashMap<URL, ConcurrentHashMap<ServiceListener, IZkChildListener>> serviceListeners = new ConcurrentHashMap();
    private ConcurrentHashMap<URL, ConcurrentHashMap<CommandListener, IZkDataListener>> commandListeners = new ConcurrentHashMap();
    private final ReentrantLock clientLock = new ReentrantLock();
    private final ReentrantLock serverLock = new ReentrantLock();

    public ZooKeeperRegistry(URL url, ZooKeeperClient client) {
        super(url);
        this.client = client;
        IZkStateListener zkStateListener = new IZkStateListener(){

            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
            }

            @Override
            public void handleNewSession() throws Exception {
                if (logger.isInfoEnabled()) {
                    logger.info("zkRegistry get new session notify.");
                }
                ZooKeeperRegistry.this.reconnectService();
                ZooKeeperRegistry.this.reconnectClient();
            }
        };
        client.subscribeStateChanges(zkStateListener);
    }

    public ConcurrentHashMap<URL, ConcurrentHashMap<ServiceListener, IZkChildListener>> getServiceListeners() {
        return this.serviceListeners;
    }

    public ConcurrentHashMap<URL, ConcurrentHashMap<CommandListener, IZkDataListener>> getCommandListeners() {
        return this.commandListeners;
    }

    @Override
    protected void subscribeService(final URL url, final ServiceListener serviceListener) {
        try {
            IZkChildListener zkChildListener;
            this.clientLock.lock();
            ConcurrentHashMap<ServiceListener, IZkChildListener> childChangeListeners = this.serviceListeners.get(url);
            if (childChangeListeners == null) {
                this.serviceListeners.putIfAbsent(url, new ConcurrentHashMap());
                childChangeListeners = this.serviceListeners.get(url);
            }
            if ((zkChildListener = childChangeListeners.get(serviceListener)) == null) {
                childChangeListeners.putIfAbsent(serviceListener, new IZkChildListener(){

                    @Override
                    public void handleChildChange(String parentPath, List<String> currentChilds) {
                        serviceListener.notifyService(url, ZooKeeperRegistry.this.getUrl(), ZooKeeperRegistry.this.nodeChildsToUrls(parentPath, currentChilds));
                        if (logger.isInfoEnabled()) {
                            logger.info(String.format("[ZooKeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString()));
                        }
                    }
                });
                zkChildListener = childChangeListeners.get(serviceListener);
            }
            this.removeNode(url, ZkNodeType.CLIENT);
            this.createNode(url, ZkNodeType.CLIENT);
            String serverTypePath = ZkUtils.toNodeTypePath(url, ZkNodeType.AVAILABLE_SERVER);
            this.client.subscribeChildChanges(serverTypePath, zkChildListener);
            if (logger.isInfoEnabled()) {
                logger.info(String.format("[ZooKeeperRegistry] subscribe service: path=%s, info=%s", ZkUtils.toNodePath(url, ZkNodeType.AVAILABLE_SERVER), url.toFullStr()));
            }
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(SUBSCRIBE_ZOOKEEPER_SERVICE_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.clientLock.unlock();
        }
    }

    @Override
    protected void subscribeCommand(final URL url, final CommandListener commandListener) {
        try {
            IZkDataListener zkDataListener;
            this.clientLock.lock();
            ConcurrentHashMap<CommandListener, IZkDataListener> dataChangeListeners = this.commandListeners.get(url);
            if (dataChangeListeners == null) {
                this.commandListeners.putIfAbsent(url, new ConcurrentHashMap());
                dataChangeListeners = this.commandListeners.get(url);
            }
            if ((zkDataListener = dataChangeListeners.get(commandListener)) == null) {
                dataChangeListeners.putIfAbsent(commandListener, new IZkDataListener(){

                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                        commandListener.notifyCommand(url, (String)data);
                        if (logger.isInfoEnabled()) {
                            logger.info(String.format("[ZooKeeperRegistry] command data change: path=%s, command=%s", dataPath, (String)data));
                        }
                    }

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        commandListener.notifyCommand(url, null);
                        if (logger.isInfoEnabled()) {
                            logger.info(String.format("[ZooKeeperRegistry] command deleted: path=%s", dataPath));
                        }
                    }
                });
                zkDataListener = dataChangeListeners.get(commandListener);
            }
            String commandPath = ZkUtils.toCommandPath(url);
            this.client.subscribeDataChanges(commandPath, zkDataListener);
            if (logger.isInfoEnabled()) {
                logger.info(String.format("[ZooKeeperRegistry] subscribe command: path=%s, info=%s", commandPath, url.toFullStr()));
            }
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(SUBSCRIBE_ZOOKEEPER_COMMAND_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.clientLock.unlock();
        }
    }

    @Override
    protected void unsubscribeService(URL url, ServiceListener serviceListener) {
        try {
            IZkChildListener zkChildListener;
            this.clientLock.lock();
            Map childChangeListeners = this.serviceListeners.get(url);
            if (childChangeListeners != null && (zkChildListener = (IZkChildListener)childChangeListeners.get(serviceListener)) != null) {
                this.client.unsubscribeChildChanges(ZkUtils.toNodeTypePath(url, ZkNodeType.CLIENT), zkChildListener);
                childChangeListeners.remove(serviceListener);
            }
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(UNSUBSCRIBE_ZOOKEEPER_SERVICE_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.clientLock.unlock();
        }
    }

    @Override
    protected void unsubscribeCommand(URL url, CommandListener commandListener) {
        try {
            IZkDataListener zkDataListener;
            this.clientLock.lock();
            Map dataChangeListeners = this.commandListeners.get(url);
            if (dataChangeListeners != null && (zkDataListener = (IZkDataListener)dataChangeListeners.get(commandListener)) != null) {
                this.client.unsubscribeDataChanges(ZkUtils.toCommandPath(url), zkDataListener);
                dataChangeListeners.remove(commandListener);
            }
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(UNSUBSCRIBE_ZOOKEEPER_COMMAND_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.clientLock.unlock();
        }
    }

    @Override
    protected List<URL> discoverService(URL url) {
        try {
            String parentPath = ZkUtils.toNodeTypePath(url, ZkNodeType.AVAILABLE_SERVER);
            List<String> currentChilds = new ArrayList<String>();
            if (this.client.exists(parentPath)) {
                currentChilds = this.client.getChildren(parentPath);
            }
            return this.nodeChildsToUrls(parentPath, currentChilds);
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(DISCOVER_ZOOKEEPER_SERVICE_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
    }

    @Override
    protected String discoverCommand(URL url) {
        try {
            String commandPath = ZkUtils.toCommandPath(url);
            String command = "";
            if (this.client.exists(commandPath)) {
                command = (String)this.client.readData(commandPath);
            }
            return command;
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(DISCOVER_ZOOKEEPER_COMMAND_ERROR, url, this.getUrl(), e.getMessage()));
        }
    }

    @Override
    protected void doRegister(URL url) {
        try {
            this.serverLock.lock();
            this.removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            this.removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            this.createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(REGISTER_ZOOKEEPER_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.serverLock.unlock();
        }
    }

    @Override
    protected void doUnregister(URL url) {
        try {
            this.serverLock.lock();
            this.removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            this.removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        }
        catch (Throwable e) {
            throw new FrameworkException(new Status(UNREGISTER_ZOOKEEPER_ERROR, url, this.getUrl(), e.getMessage()), e);
        }
        finally {
            this.serverLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doAvailable(URL url) {
        try {
            this.serverLock.lock();
            if (url == null) {
                this.availableServices.addAll(this.getRegisteredServiceUrls());
                for (URL u : this.getRegisteredServiceUrls()) {
                    this.removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                    this.removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                    this.createNode(u, ZkNodeType.AVAILABLE_SERVER);
                }
            } else {
                this.availableServices.add(url);
                this.removeNode(url, ZkNodeType.AVAILABLE_SERVER);
                this.removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
                this.createNode(url, ZkNodeType.AVAILABLE_SERVER);
            }
        }
        finally {
            this.serverLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doUnavailable(URL url) {
        try {
            this.serverLock.lock();
            if (url == null) {
                this.availableServices.removeAll(this.getRegisteredServiceUrls());
                for (URL u : this.getRegisteredServiceUrls()) {
                    this.removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                    this.removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                    this.createNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                }
            } else {
                this.availableServices.remove(url);
                this.removeNode(url, ZkNodeType.AVAILABLE_SERVER);
                this.removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
                this.createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            }
        }
        finally {
            this.serverLock.unlock();
        }
    }

    private List<URL> nodeChildsToUrls(String parentPath, List<String> currentChilds) {
        ArrayList<URL> urls = new ArrayList<URL>();
        if (currentChilds != null) {
            for (String node : currentChilds) {
                String nodePath = parentPath + "/" + node;
                String data = (String)this.client.readData(nodePath, true);
                try {
                    URL url = URLImpl.valueOf(data);
                    urls.add(url);
                }
                catch (Exception e) {
                    if (!logger.isInfoEnabled()) continue;
                    logger.warn(String.format("Found malformed urls from ZooKeeperRegistry, path=%s", nodePath), e);
                }
            }
        }
        return urls;
    }

    private void createNode(URL url, ZkNodeType nodeType) {
        String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType);
        if (!this.client.exists(nodeTypePath)) {
            this.client.createPersistent(nodeTypePath, true);
        }
        this.client.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());
    }

    private void removeNode(URL url, ZkNodeType nodeType) {
        String nodePath = ZkUtils.toNodePath(url, nodeType);
        if (this.client.exists(nodePath)) {
            this.client.delete(nodePath);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reconnectService() {
        Collection allRegisteredServices = this.getRegisteredServiceUrls();
        if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) {
            try {
                this.serverLock.lock();
                for (URL url : this.getRegisteredServiceUrls()) {
                    this.doRegister(url);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("[{}] reconnect: register services {}", (Object)this.registryClassName, (Object)allRegisteredServices);
                }
                for (URL url : this.availableServices) {
                    if (!this.getRegisteredServiceUrls().contains(url)) {
                        if (!logger.isWarnEnabled()) continue;
                        logger.warn("reconnect url not register. url:{}", (Object)url);
                        continue;
                    }
                    this.doAvailable(url);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("[{}] reconnect: available services {}", (Object)this.registryClassName, (Object)this.availableServices);
                }
            }
            finally {
                this.serverLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reconnectClient() {
        if (this.serviceListeners != null && !this.serviceListeners.isEmpty()) {
            try {
                URL url;
                this.clientLock.lock();
                for (Map.Entry<URL, ConcurrentHashMap<ServiceListener, IZkChildListener>> entry : this.serviceListeners.entrySet()) {
                    url = entry.getKey();
                    ConcurrentHashMap<ServiceListener, IZkChildListener> childChangeListeners = this.serviceListeners.get(url);
                    if (childChangeListeners == null) continue;
                    for (Map.Entry<ServiceListener, IZkChildListener> entry2 : childChangeListeners.entrySet()) {
                        this.subscribeService(url, entry2.getKey());
                    }
                }
                for (Map.Entry<URL, ConcurrentHashMap<Object, Object>> entry : this.commandListeners.entrySet()) {
                    url = entry.getKey();
                    ConcurrentHashMap<CommandListener, IZkDataListener> dataChangeListeners = this.commandListeners.get(url);
                    if (dataChangeListeners == null) continue;
                    for (Map.Entry<Object, Object> entry3 : dataChangeListeners.entrySet()) {
                        this.subscribeCommand(url, (CommandListener)entry3.getKey());
                    }
                }
                if (logger.isInfoEnabled()) {
                    logger.info("[{}] reconnect all clients", (Object)this.registryClassName);
                }
            }
            finally {
                this.clientLock.unlock();
            }
        }
    }
}

