/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.d2.balancer.zkfs;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.DarkClusterConfigMap;
import com.linkedin.d2.balancer.Directory;
import com.linkedin.d2.balancer.Facilities;
import com.linkedin.d2.balancer.KeyMapper;
import com.linkedin.d2.balancer.LoadBalancer;
import com.linkedin.d2.balancer.LoadBalancerClusterListener;
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.WarmUpService;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfig;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.util.ClientFactoryProvider;
import com.linkedin.d2.balancer.util.ClusterInfoProvider;
import com.linkedin.d2.balancer.util.DirectoryProvider;
import com.linkedin.d2.balancer.util.HostToKeyMapper;
import com.linkedin.d2.balancer.util.KeyMapperProvider;
import com.linkedin.d2.balancer.util.MapKeyResult;
import com.linkedin.d2.balancer.util.TogglingLoadBalancer;
import com.linkedin.d2.balancer.util.hashing.ConsistentHashKeyMapper;
import com.linkedin.d2.balancer.util.hashing.HashFunction;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.hashing.Ring;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessor;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.d2.balancer.zkfs.ZKFSDirectory;
import com.linkedin.d2.discovery.event.PropertyEventThread;
import com.linkedin.d2.discovery.stores.zk.ZKConnection;
import com.linkedin.d2.discovery.stores.zk.ZKConnectionBuilder;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.util.NamedThreadFactory;
import java.io.File;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKFSLoadBalancer
implements LoadBalancerWithFacilities,
DirectoryProvider,
KeyMapperProvider,
HashRingProvider,
PartitionInfoProvider,
ClientFactoryProvider,
WarmUpService,
ClusterInfoProvider {
    private static final Logger LOG = LoggerFactory.getLogger(ZKFSLoadBalancer.class);
    private final String _connectString;
    private final int _sessionTimeout;
    private final int _initialZKTimeout;
    private final AtomicReference<Callback<None>> _startupCallback = new AtomicReference();
    private final TogglingLoadBalancerFactory _loadBalancerFactory;
    private final File _zkFlagFile;
    private final ZKFSDirectory _directory;
    private final ZKConnectionBuilder _zkConnectionBuilder;
    private volatile long _delayedExecution;
    private final ScheduledExecutorService _executor;
    private final KeyMapper _keyMapper;
    private volatile ZKConnection _zkConnection;
    private volatile TogglingLoadBalancer _currentLoadBalancer;

    @Override
    public int getClusterCount(String clusterName, String scheme, int partitionId) throws ServiceUnavailableException {
        return this._currentLoadBalancer.getClusterCount(clusterName, scheme, partitionId);
    }

    @Override
    public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName) throws ServiceUnavailableException {
        return this._currentLoadBalancer.getDarkClusterConfigMap(clusterName);
    }

    @Override
    public void getDarkClusterConfigMap(String clusterName, Callback<DarkClusterConfigMap> callback) {
        this._currentLoadBalancer.getDarkClusterConfigMap(clusterName, callback);
    }

    @Override
    public void registerClusterListener(LoadBalancerClusterListener clusterListener) {
        this._currentLoadBalancer.registerClusterListener(clusterListener);
    }

    @Override
    public void unregisterClusterListener(LoadBalancerClusterListener clusterListener) {
        this._currentLoadBalancer.unregisterClusterListener(clusterListener);
    }

    @Override
    public FailoutConfig getFailoutConfig(String clusterName) {
        return this._currentLoadBalancer.getFailoutConfig(clusterName);
    }

    public ZKFSLoadBalancer(String zkConnectString, int sessionTimeout, int initialZKTimeout, TogglingLoadBalancerFactory factory, String zkFlagFile, String basePath) {
        this(zkConnectString, sessionTimeout, initialZKTimeout, factory, zkFlagFile, basePath, false, false, Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("D2 PropertyEventExecutor for Tests")), null);
    }

    public ZKFSLoadBalancer(String zkConnectString, int sessionTimeout, int initialZKTimeout, TogglingLoadBalancerFactory factory, String zkFlagFile, String basePath, boolean shutdownAsynchronously, boolean isSymlinkAware, ScheduledExecutorService executor, Function<ZooKeeper, ZooKeeper> zooKeeperDecorator) {
        this._connectString = zkConnectString;
        this._sessionTimeout = sessionTimeout;
        this._initialZKTimeout = initialZKTimeout;
        this._loadBalancerFactory = factory;
        this._zkFlagFile = zkFlagFile == null ? null : new File(zkFlagFile);
        this._directory = new ZKFSDirectory(basePath);
        this._executor = executor;
        this._keyMapper = new ConsistentHashKeyMapper(this, this);
        this._delayedExecution = 1000L;
        this._zkConnectionBuilder = new ZKConnectionBuilder(zkConnectString).setTimeout(sessionTimeout).setShutdownAsynchronously(shutdownAsynchronously).setIsSymlinkAware(isSymlinkAware).setZooKeeperDecorator(zooKeeperDecorator);
    }

    public long getDelayedExecution() {
        return this._delayedExecution;
    }

    public void setDelayedExecution(long milliseconds) {
        this._delayedExecution = milliseconds;
    }

    @Override
    public void getClient(Request request, RequestContext requestContext, Callback<TransportClient> clientCallback) {
        this._currentLoadBalancer.getClient(request, requestContext, clientCallback);
    }

    @Override
    public void shutdown(final PropertyEventThread.PropertyEventShutdownCallback callback) {
        LOG.info("Shutting down");
        this._currentLoadBalancer.shutdown(new PropertyEventThread.PropertyEventShutdownCallback(){

            @Override
            public void done() {
                try {
                    LOG.info("Shutting down ZooKeeper connection");
                    ZKFSLoadBalancer.this._zkConnection.shutdown();
                }
                catch (InterruptedException e) {
                    LOG.warn("Unexpected exception during shutdown", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
                finally {
                    LOG.info("Shutting down PropertyEvent executor");
                    ZKFSLoadBalancer.this._executor.shutdown();
                    callback.done();
                }
            }
        });
    }

    @Override
    public void getLoadBalancedServiceProperties(String serviceName, Callback<ServiceProperties> clientCallback) {
        if (this._currentLoadBalancer == null) {
            clientCallback.onSuccess(null);
            return;
        }
        this._currentLoadBalancer.getLoadBalancedServiceProperties(serviceName, clientCallback);
    }

    @Override
    public void start(final Callback<None> callback) {
        Callback<None> wrapped;
        LOG.info("Starting ZKFSLoadBalancer");
        LOG.info("ZK connect string: {}", (Object)this._connectString);
        LOG.info("ZK session timeout: {}ms", (Object)this._sessionTimeout);
        LOG.info("ZK initial connect timeout: {}ms", (Object)this._initialZKTimeout);
        if (this._connectString == null || this._connectString.isEmpty()) {
            callback.onError((Throwable)new IllegalArgumentException("ZooKeeper connection string is null or empty"));
            return;
        }
        if (this._zkFlagFile == null) {
            LOG.info("ZK flag file not specified");
        } else {
            LOG.info("ZK flag file: {}", (Object)this._zkFlagFile.getAbsolutePath());
            LOG.info("ZK currently suppressed by flag file: {}", (Object)this.suppressZK());
        }
        this._zkConnection = this._zkConnectionBuilder.build();
        final TogglingLoadBalancer balancer = this._loadBalancerFactory.createLoadBalancer(this._zkConnection, this._executor);
        if (this._currentLoadBalancer == null) {
            this._currentLoadBalancer = balancer;
        }
        if (!this._startupCallback.compareAndSet(null, wrapped = new Callback<None>(){

            public void onSuccess(None none) {
                ZKFSLoadBalancer.this._currentLoadBalancer = balancer;
                callback.onSuccess((Object)none);
            }

            public void onError(Throwable e) {
                callback.onError(e);
            }
        })) {
            throw new IllegalStateException("Startup already in progress");
        }
        this._executor.execute(new PropertyEventThread.PropertyEvent("startup"){

            @Override
            public void innerRun() {
                ZKFSLoadBalancer.this._zkConnection.addStateListener(new ZKListener(balancer));
                try {
                    ZKFSLoadBalancer.this._zkConnection.start();
                }
                catch (Exception e) {
                    LOG.error("Failed to start ZooKeeper (bad configuration?), enabling backup stores", (Throwable)e);
                    Callback startupCallback = ZKFSLoadBalancer.this._startupCallback.getAndSet(null);
                    balancer.enableBackup((Callback<None>)startupCallback);
                    return;
                }
                LOG.info("Started ZooKeeper");
                ZKFSLoadBalancer.this._executor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        Callback startupCallback = ZKFSLoadBalancer.this._startupCallback.getAndSet(null);
                        if (startupCallback != null) {
                            LOG.error("No response from ZooKeeper within {}ms, enabling backup stores", (Object)ZKFSLoadBalancer.this._initialZKTimeout);
                            balancer.enableBackup((Callback<None>)startupCallback);
                        }
                    }
                }, (long)ZKFSLoadBalancer.this._initialZKTimeout, TimeUnit.MILLISECONDS);
            }
        });
    }

    @Override
    public Directory getDirectory() {
        return this._directory;
    }

    @Override
    public PartitionInfoProvider getPartitionInfoProvider() {
        return this;
    }

    @Override
    public HashRingProvider getHashRingProvider() {
        return this;
    }

    @Override
    public KeyMapper getKeyMapper() {
        return this._keyMapper;
    }

    @Override
    public <K> MapKeyResult<Ring<URI>, K> getRings(URI serviceUri, Iterable<K> keys) throws ServiceUnavailableException {
        this.checkLoadBalancer();
        return this._currentLoadBalancer.getRings(serviceUri, keys);
    }

    @Override
    public Map<Integer, Ring<URI>> getRings(URI serviceUri) throws ServiceUnavailableException {
        this.checkLoadBalancer();
        return this._currentLoadBalancer.getRings(serviceUri);
    }

    @Override
    public HashFunction<Request> getRequestHashFunction(String serviceName) throws ServiceUnavailableException {
        return this._currentLoadBalancer.getRequestHashFunction(serviceName);
    }

    @Override
    public <K> HostToKeyMapper<K> getPartitionInformation(URI serviceUri, Collection<K> keys, int limitHostPerPartition, int hash) throws ServiceUnavailableException {
        this.checkPartitionInfoProvider();
        return this._currentLoadBalancer.getPartitionInformation(serviceUri, keys, limitHostPerPartition, hash);
    }

    @Override
    public PartitionAccessor getPartitionAccessor(String serviceName) throws ServiceUnavailableException {
        this.checkPartitionInfoProvider();
        return this._currentLoadBalancer.getPartitionAccessor(serviceName);
    }

    public void checkLoadBalancer() {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof HashRingProvider)) {
            throw new IllegalStateException("No HashRingProvider available to ZKFSLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with strategies that do not support consistent hashing.");
        }
    }

    private void checkPartitionInfoProvider() {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof PartitionInfoProvider)) {
            throw new IllegalStateException("No PartitionInfoProvider available to TogglingLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with strategies that do not support consistent hashing.");
        }
    }

    private void checkClusterInfoProvider() {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof ClusterInfoProvider)) {
            throw new IllegalStateException("No ClusterInfoProvider available to TogglingLoadBalancer - this could be because the load balancer is not yet initialized or the underlying load balancer doesn't support providing this info.");
        }
    }

    @Override
    public TransportClientFactory getClientFactory(String scheme) {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof ClientFactoryProvider)) {
            throw new IllegalStateException("No ClientFactoryProvider available to ZKFSLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with a LoadBalancer which does notsupport obtaining client factories");
        }
        return this._currentLoadBalancer.getClientFactory(scheme);
    }

    @Override
    public ClusterInfoProvider getClusterInfoProvider() {
        this.checkClusterInfoProvider();
        return this._currentLoadBalancer;
    }

    @Override
    public void warmUpService(String serviceName, Callback<None> callback) {
        this._currentLoadBalancer.warmUpService(serviceName, callback);
    }

    public Facilities getFacilities() {
        return this;
    }

    private boolean suppressZK() {
        return this._zkFlagFile != null && this._zkFlagFile.exists();
    }

    private Callback<None> getStartupOrLoggerCallback() {
        Callback callback = this._startupCallback.getAndSet(null);
        if (callback == null) {
            callback = new Callback<None>(){

                public void onSuccess(None none) {
                    LOG.info("Enabled stores");
                }

                public void onError(Throwable e) {
                    LOG.error("Failed to enable stores", e);
                }
            };
        }
        return callback;
    }

    private void processSyncConnected(TogglingLoadBalancer balancer) {
        if (this.suppressZK()) {
            LOG.warn("ZooKeeper currently suppressed by flag file {}, enabling backup stores", (Object)this._zkFlagFile.getAbsolutePath());
            balancer.enableBackup(this.getStartupOrLoggerCallback());
        } else {
            LOG.info("Enabling primary ZK stores");
            this._directory.setConnection(this._zkConnection);
            balancer.enablePrimary(new Callback<None>(){

                public void onSuccess(None result) {
                    ZKFSLoadBalancer.this.getStartupOrLoggerCallback().onSuccess((Object)result);
                }

                public void onError(Throwable e) {
                    LOG.info("Ignored error enabling primary ZK stores; expecting Disconnected notification", e);
                }
            });
        }
    }

    private void reset(final LoadBalancer balancer) {
        LOG.info("Resetting LoadBalancerState");
        Callback callback = this._startupCallback.getAndSet(null);
        if (callback != null) {
            callback.onError((Throwable)new KeeperException.SessionExpiredException());
        }
        callback = new Callback<None>(){

            public void onSuccess(None none) {
                LOG.info("Successfully reset LoadBalancer after ZooKeeper session expiration");
                ZKFSLoadBalancer.this._executor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        balancer.shutdown(new PropertyEventThread.PropertyEventShutdownCallback(){

                            @Override
                            public void done() {
                                LOG.info("Shut down old LoadBalancer after ZooKeeper session expiration");
                            }
                        });
                    }
                }, ZKFSLoadBalancer.this._delayedExecution, TimeUnit.MILLISECONDS);
            }

            public void onError(Throwable e) {
                LOG.error("Failed to reset LoadBalancer after ZooKeeper session expiration");
            }
        };
        this.start((Callback<None>)callback);
    }

    public void reset() {
        this.reset(this._currentLoadBalancer);
    }

    ZKConnection zkConnection() {
        return this._zkConnection;
    }

    private class ZKListener
    implements ZKConnection.StateListener {
        private final TogglingLoadBalancer _balancer;

        private ZKListener(TogglingLoadBalancer balancer) {
            this._balancer = balancer;
        }

        @Override
        public void notifyStateChange(final Watcher.Event.KeeperState state) {
            LOG.info("ZooKeeper session {} received KeeperState {}", (Object)ZKFSLoadBalancer.this._zkConnection.getZooKeeper().getSessionId(), (Object)state);
            ZKFSLoadBalancer.this._executor.execute(new Runnable(){

                @Override
                public void run() {
                    switch (state) {
                        case SyncConnected: {
                            ZKFSLoadBalancer.this.processSyncConnected(ZKListener.this._balancer);
                            break;
                        }
                        case Disconnected: {
                            LOG.info("Enabling backup stores");
                            ZKListener.this._balancer.enableBackup((Callback<None>)ZKFSLoadBalancer.this.getStartupOrLoggerCallback());
                            break;
                        }
                        case Expired: {
                            ZKFSLoadBalancer.this.reset(ZKListener.this._balancer);
                            break;
                        }
                        default: {
                            LOG.info("Ignoring unknown state change {}", (Object)state);
                        }
                    }
                }
            });
        }
    }

    public static interface TogglingLoadBalancerFactory {
        public TogglingLoadBalancer createLoadBalancer(ZKConnection var1, ScheduledExecutorService var2);
    }
}

