/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.helios.servicescommon.coordination;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.MapType;
import com.google.common.base.Charsets;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.spotify.helios.agent.BoundedRandomExponentialBackoff;
import com.spotify.helios.agent.RetryIntervalPolicy;
import com.spotify.helios.agent.RetryScheduler;
import com.spotify.helios.common.Json;
import com.spotify.helios.servicescommon.DefaultReactor;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.Reactor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentPathChildrenCache<T>
extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(PersistentPathChildrenCache.class);
    private static final long REFRESH_INTERVAL_MILLIS = 30000L;
    private final PersistentAtomicReference<Map<String, T>> snapshot;
    private final CuratorFramework curator;
    private final String path;
    private final JavaType valueType;
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList();
    private final CuratorWatcher childrenWatcher = new ChildrenWatcher();
    private final CuratorWatcher dataWatcher = new DataWatcher();
    private final Set<String> changes = Sets.newConcurrentHashSet();
    private final Reactor reactor;
    private volatile boolean synced;

    public PersistentPathChildrenCache(CuratorFramework curator, String path, Path snapshotFile, JavaType valueType) throws IOException, InterruptedException {
        this.curator = curator;
        this.path = path;
        this.valueType = valueType;
        MapType mapType = Json.typeFactory().constructMapType(HashMap.class, Json.type(String.class), valueType);
        Supplier empty = Suppliers.ofInstance(Collections.emptyMap());
        this.snapshot = PersistentAtomicReference.create(snapshotFile, (JavaType)mapType, empty);
        this.reactor = new DefaultReactor("zk-ppcc:" + path, new Update(), 30000L);
        curator.getConnectionStateListenable().addListener((Object)new ConnectionListener());
    }

    public void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    public void removeListener(Listener listener) {
        this.listeners.remove(listener);
    }

    protected void startUp() throws Exception {
        log.debug("starting cache");
        this.reactor.startAsync().awaitRunning();
        this.reactor.signal();
    }

    protected void shutDown() throws Exception {
        this.reactor.stopAsync().awaitTerminated();
    }

    public Map<String, T> getNodes() {
        return this.snapshot.get();
    }

    private void fireConnectionStateChanged(ConnectionState state) {
        for (Listener listener : this.listeners) {
            try {
                listener.connectionStateChanged(state);
            }
            catch (Exception e) {
                log.error("Listener threw exception", (Throwable)e);
            }
        }
    }

    private boolean isAlive() {
        return this.state().ordinal() < Service.State.STOPPING.ordinal();
    }

    private void update() throws KeeperException, InterruptedException {
        Map<Object, Object> newSnapshot;
        log.debug("updating: {}", (Object)this.path);
        Map<String, T> currentSnapshot = this.snapshot.get();
        if (!this.synced) {
            this.synced = true;
            newSnapshot = this.sync();
        } else {
            newSnapshot = Maps.newHashMap(currentSnapshot);
        }
        Iterator<String> iterator = this.changes.iterator();
        while (iterator.hasNext()) {
            Object value;
            String child = iterator.next();
            iterator.remove();
            String node = ZKPaths.makePath((String)this.path, (String)child);
            log.debug("fetching change: {}", (Object)node);
            try {
                byte[] bytes = (byte[])((BackgroundPathable)this.curator.getData().usingWatcher(this.dataWatcher)).forPath(node);
                value = Json.read((byte[])bytes, (JavaType)this.valueType);
            }
            catch (KeeperException e) {
                throw e;
            }
            catch (Exception e) {
                throw Throwables.propagate((Throwable)e);
            }
            newSnapshot.put(node, value);
        }
        if (!currentSnapshot.equals(newSnapshot)) {
            this.snapshot.setUnchecked(newSnapshot);
            this.fireNodesChanged();
        }
    }

    private void fireNodesChanged() {
        for (Listener listener : this.listeners) {
            try {
                listener.nodesChanged(this);
            }
            catch (Exception e) {
                log.error("Listener threw exception", (Throwable)e);
            }
        }
    }

    private Map<String, T> sync() throws KeeperException {
        log.debug("syncing: {}", (Object)this.path);
        HashMap newSnapshot = Maps.newHashMap();
        try {
            List children = (List)((BackgroundPathable)this.curator.getChildren().usingWatcher(this.childrenWatcher)).forPath(this.path);
            log.debug("children: {}", (Object)children);
            for (String child : children) {
                Object value;
                String node = ZKPaths.makePath((String)this.path, (String)child);
                byte[] bytes = (byte[])((BackgroundPathable)this.curator.getData().usingWatcher(this.dataWatcher)).forPath(node);
                String json = new String(bytes, Charsets.UTF_8);
                log.debug("child: {}={}", (Object)node, (Object)json);
                try {
                    value = Json.read((byte[])bytes, (JavaType)this.valueType);
                }
                catch (IOException e) {
                    log.warn("failed to parse node: {}: {}", new Object[]{node, json, e});
                    continue;
                }
                newSnapshot.put(node, value);
            }
        }
        catch (KeeperException e) {
            throw e;
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
        return newSnapshot;
    }

    private class ConnectionListener
    implements ConnectionStateListener {
        private ConnectionListener() {
        }

        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            log.debug("connection state change: {}", (Object)newState);
            if (newState == ConnectionState.RECONNECTED) {
                PersistentPathChildrenCache.this.synced = false;
                PersistentPathChildrenCache.this.reactor.signal();
            }
            PersistentPathChildrenCache.this.fireConnectionStateChanged(newState);
        }
    }

    private class DataWatcher
    implements CuratorWatcher {
        private DataWatcher() {
        }

        public void process(WatchedEvent event) throws Exception {
            log.debug("data event: {}", (Object)event);
            if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                String child = ZKPaths.getNodeFromPath((String)event.getPath());
                PersistentPathChildrenCache.this.changes.add(child);
                PersistentPathChildrenCache.this.reactor.signal();
            }
        }
    }

    private class ChildrenWatcher
    implements CuratorWatcher {
        private ChildrenWatcher() {
        }

        public void process(WatchedEvent event) throws Exception {
            log.debug("children event: {}", (Object)event);
            PersistentPathChildrenCache.this.synced = false;
            PersistentPathChildrenCache.this.reactor.signal();
        }
    }

    private class Update
    implements Reactor.Callback {
        final RetryIntervalPolicy retryIntervalPolicy = BoundedRandomExponentialBackoff.newBuilder().setMinInterval(1L, TimeUnit.SECONDS).setMaxInterval(30L, TimeUnit.SECONDS).build();

        private Update() {
        }

        @Override
        public void run(boolean timeout) throws InterruptedException {
            RetryScheduler retryScheduler = this.retryIntervalPolicy.newScheduler();
            while (PersistentPathChildrenCache.this.isAlive()) {
                try {
                    PersistentPathChildrenCache.this.update();
                    return;
                }
                catch (KeeperException e) {
                    PersistentPathChildrenCache.this.synced = false;
                    log.warn("update failed: {}", (Object)e.getMessage());
                    Thread.sleep(retryScheduler.nextMillis());
                }
            }
        }
    }

    public static interface Listener {
        public void nodesChanged(PersistentPathChildrenCache<?> var1);

        public void connectionStateChanged(ConnectionState var1);
    }
}

