/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.d2.discovery.stores.zk;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.discovery.PropertySerializationException;
import com.linkedin.d2.discovery.PropertySerializer;
import com.linkedin.d2.discovery.stores.zk.ZKConnection;
import com.linkedin.d2.discovery.stores.zk.ZkUtil;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore;
import com.linkedin.d2.discovery.util.LogUtil;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperPermanentStore<T>
extends ZooKeeperStore<T> {
    private static final Logger _log = LoggerFactory.getLogger(ZooKeeperPermanentStore.class);
    private final ZKStoreWatcher _zkStoreWatcher = new ZKStoreWatcher();
    private final ScheduledExecutorService _executorService;
    private int _zookeeperReadWindowMs;

    public ZooKeeperPermanentStore(ZKConnection client, PropertySerializer<T> serializer, String path) {
        this(client, serializer, path, null, -1);
    }

    public ZooKeeperPermanentStore(ZKConnection client, PropertySerializer<T> serializer, String path, ScheduledExecutorService executorService, int zookeeperReadWindowMs) {
        super(client, serializer, path);
        this._executorService = executorService;
        this._zookeeperReadWindowMs = zookeeperReadWindowMs;
    }

    @Override
    public void put(String listenTo, T discoveryProperties, Callback<None> callback) {
        this._putStats.inc();
        LogUtil.trace(_log, "put ", listenTo, ": ", discoveryProperties);
        String path = this.getPath(listenTo);
        this._zkConn.ensurePersistentNodeExists(path, this.getExistsCallBack(listenTo, discoveryProperties, callback));
    }

    protected Callback<None> getExistsCallBack(String listenTo, final T discoveryProperties, final Callback<None> callback) {
        final String path = this.getPath(listenTo);
        return new Callback<None>(){

            public void onSuccess(None none) {
                ZooKeeperPermanentStore.this._zkConn.setDataUnsafe(path, ZooKeeperPermanentStore.this._serializer.toBytes(discoveryProperties), (Callback<None>)callback);
            }

            public void onError(Throwable e) {
                callback.onError(e);
            }
        };
    }

    @Override
    public void remove(String listenTo, Callback<None> callback) {
        this._removeStats.inc();
        LogUtil.trace(_log, "remove: ", listenTo);
        String path = this.getPath(listenTo);
        this._zkConn.removeNodeUnsafe(path, callback);
    }

    @Override
    public void get(String listenTo, Callback<T> callback) {
        this._getStats.inc();
        LogUtil.trace(_log, "get: ", listenTo);
        String path = this.getPath(listenTo);
        AsyncCallback.DataCallback dataCallback = ZkUtil.zkDataCallback(callback, this._serializer);
        this._zk.getData(path, false, dataCallback, null);
    }

    @Override
    public void startPublishing(String prop) {
        LogUtil.trace(_log, "register: ", prop);
        if (this._eventBus == null) {
            throw new IllegalStateException("_eventBus must not be null when publishing");
        }
        this._zkStoreWatcher.addWatch(prop);
        this._zk.getData(this.getPath(prop), this._zkStoreWatcher, (AsyncCallback.DataCallback)this._zkStoreWatcher, (Object)true);
    }

    @Override
    public void stopPublishing(String prop) {
        LogUtil.trace(_log, "unregister: ", prop);
        this._zkStoreWatcher.cancelWatch(prop);
    }

    public int getListenerCount() {
        return this._zkStoreWatcher.getWatchCount();
    }

    private class ZKStoreWatcher
    extends ZooKeeperStore.ZKStoreWatcher
    implements AsyncCallback.DataCallback,
    AsyncCallback.StatCallback {
        private ZKStoreWatcher() {
            super(ZooKeeperPermanentStore.this);
        }

        @Override
        protected void processWatch(String propertyName, WatchedEvent watchedEvent) {
            if (ZooKeeperPermanentStore.this._zookeeperReadWindowMs > 0 && ZooKeeperPermanentStore.this._executorService != null) {
                int delay = ThreadLocalRandom.current().nextInt(ZooKeeperPermanentStore.this._zookeeperReadWindowMs);
                ZooKeeperPermanentStore.this._executorService.schedule(() -> ZooKeeperPermanentStore.this._zk.getData(watchedEvent.getPath(), this, (AsyncCallback.DataCallback)this, (Object)false), (long)delay, TimeUnit.MILLISECONDS);
            } else {
                ZooKeeperPermanentStore.this._zk.getData(watchedEvent.getPath(), this, (AsyncCallback.DataCallback)this, (Object)false);
            }
        }

        public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
            KeeperException.Code code = KeeperException.Code.get((int)rc);
            _log.debug("{}: getData returned {}", (Object)path, (Object)code);
            String propertyName = ZooKeeperPermanentStore.this.getPropertyForPath(path);
            boolean init = (Boolean)ctx;
            switch (code) {
                case OK: {
                    Object propertyValue;
                    try {
                        propertyValue = ZooKeeperPermanentStore.this._serializer.fromBytes(bytes);
                    }
                    catch (PropertySerializationException e) {
                        _log.error("Failed to deserialize property " + propertyName + ", value in bytes:" + new String(bytes), (Throwable)e);
                        propertyValue = null;
                    }
                    if (init) {
                        ZooKeeperPermanentStore.this._eventBus.publishInitialize(propertyName, propertyValue);
                        _log.debug("{}: published init", (Object)path);
                        break;
                    }
                    ZooKeeperPermanentStore.this._eventBus.publishAdd(propertyName, propertyValue);
                    _log.debug("{}: published add", (Object)path);
                    break;
                }
                case NONODE: {
                    if (init) {
                        ZooKeeperPermanentStore.this._eventBus.publishInitialize(propertyName, null);
                        _log.debug("{}: published init for NONODE event", (Object)path);
                    } else {
                        ZooKeeperPermanentStore.this._eventBus.publishRemove(propertyName);
                        _log.debug("{}: published remove", (Object)path);
                    }
                    _log.debug("{}: node not present, calling exists", (Object)path);
                    ZooKeeperPermanentStore.this._zk.exists(path, this, (AsyncCallback.StatCallback)this, (Object)false);
                    break;
                }
                default: {
                    _log.error("getData: unexpected error: {}: {}", (Object)code, (Object)path);
                }
            }
        }

        public void processResult(int rc, String path, Object ctx, Stat stat) {
            KeeperException.Code code = KeeperException.Code.get((int)rc);
            _log.debug("{}: exists returned {}", (Object)path, (Object)code);
            switch (code) {
                case OK: {
                    _log.debug("{}: node is back, calling getData");
                    ZooKeeperPermanentStore.this._zk.getData(path, this, (AsyncCallback.DataCallback)this, (Object)false);
                    break;
                }
                case NONODE: {
                    _log.debug("{}: set exists watch", (Object)path);
                    break;
                }
                default: {
                    _log.error("exists: unexpected error: {}: {}", (Object)code, (Object)path);
                }
            }
        }
    }
}

