/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.zookeeper;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GlobalZooKeeperCache
extends ZooKeeperCache
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(GlobalZooKeeperCache.class);
    private final ZooKeeperClientFactory zlClientFactory;
    private final int zkSessionTimeoutMillis;
    private final String globalZkConnect;
    private final ScheduledExecutorService scheduledExecutor;

    public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis, int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) {
        super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds);
        this.zlClientFactory = zkClientFactory;
        this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
        this.globalZkConnect = globalZkConnect;
        this.scheduledExecutor = scheduledExecutor;
    }

    public void start() throws IOException {
        CompletableFuture<ZooKeeper> zkFuture = this.zlClientFactory.create(this.globalZkConnect, ZooKeeperClientFactory.SessionType.AllowReadOnly, this.zkSessionTimeoutMillis);
        try {
            ZooKeeper newSession = zkFuture.get(this.zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
            newSession.register(this);
            this.zkSession.set(newSession);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Failed to establish global zookeeper session: {}", (Object)e.getMessage(), (Object)e);
            throw new IOException(e);
        }
    }

    @Override
    public void close() throws IOException {
        ZooKeeper currentSession = this.zkSession.getAndSet(null);
        if (currentSession != null) {
            try {
                currentSession.close();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        super.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> void process(WatchedEvent event, ZooKeeperCache.CacheUpdater<T> updater) {
        GlobalZooKeeperCache globalZooKeeperCache = this;
        synchronized (globalZooKeeperCache) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Got Global ZooKeeper WatchdEvent: EventType: {}, KeeperState: {}, path: {}", new Object[]{this.hashCode(), event.getType(), event.getState(), event.getPath()});
            }
            if (event.getType() == Watcher.Event.EventType.None) {
                switch (event.getState()) {
                    case Expired: {
                        ZooKeeper oldSession = this.zkSession.getAndSet(null);
                        LOG.warn("Global ZK session lost. Triggering reconnection {}", (Object)oldSession);
                        this.safeCloseZkSession(oldSession);
                        this.asyncRestartZooKeeperSession();
                        return;
                    }
                    case SyncConnected: 
                    case ConnectedReadOnly: {
                        Preconditions.checkNotNull((ZooKeeper)this.zkSession.get());
                        LOG.info("Global ZK session {} restored connection.", this.zkSession.get());
                        this.dataCache.synchronous().invalidateAll();
                        this.childrenCache.synchronous().invalidateAll();
                        return;
                    }
                }
            }
        }
        super.process(event, updater);
    }

    protected void asyncRestartZooKeeperSession() {
        LOG.info("Re-starting global ZK session in the background...");
        CompletableFuture<ZooKeeper> zkFuture = this.zlClientFactory.create(this.globalZkConnect, ZooKeeperClientFactory.SessionType.AllowReadOnly, this.zkSessionTimeoutMillis);
        ((CompletableFuture)zkFuture.thenAccept(zk -> {
            if (this.zkSession.compareAndSet(null, zk)) {
                LOG.info("Successfully re-created global ZK session: {}", zk);
            } else {
                this.safeCloseZkSession((ZooKeeper)zk);
            }
        })).exceptionally(ex -> {
            LOG.warn("Failed to re-create global ZK session: {}", (Object)ex.getMessage());
            this.scheduledExecutor.schedule(() -> this.asyncRestartZooKeeperSession(), 10L, TimeUnit.SECONDS);
            return null;
        });
    }

    private void safeCloseZkSession(ZooKeeper zkSession) {
        if (zkSession != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing global zkSession:{}", (Object)zkSession.getSessionId());
            }
            try {
                zkSession.close();
            }
            catch (Exception e) {
                LOG.info("Closing Global ZK Session encountered an exception: {}. Disposed anyway.", (Object)e.getMessage());
            }
        }
    }
}

