/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.d2.discovery.stores.zk;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.callback.MultiCallback;
import com.linkedin.common.util.None;
import com.linkedin.d2.discovery.event.PropertyEventBus;
import com.linkedin.d2.discovery.event.PropertyEventBusImpl;
import com.linkedin.d2.discovery.event.PropertyEventBusRequestsThrottler;
import com.linkedin.d2.discovery.event.PropertyEventPublisher;
import com.linkedin.d2.discovery.event.PropertyEventSubscriber;
import com.linkedin.d2.discovery.stores.file.FileStore;
import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperConnectionAwareStore;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore;
import com.linkedin.d2.discovery.stores.zk.builder.ZooKeeperStoreBuilder;
import com.linkedin.r2.transport.http.client.TimeoutCallback;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LastSeenZKStore<T>
implements PropertyEventPublisher<T> {
    private static final Logger LOG = LoggerFactory.getLogger(LastSeenZKStore.class);
    private final FileStore<T> _fsStore;
    private final ZooKeeperConnectionAwareStore<T, ? extends ZooKeeperStore<T>> _zkAwareStore;
    private final ZkBusUpdater _zkBusUpdaterSubscriber;
    private final ScheduledExecutorService _executorService;
    private final int _warmUpTimeoutSeconds;
    private PropertyEventBus<T> _clientBus;
    private PropertyEventBus<T> _zkToFsBus;
    private final int _concurrentRequests;

    public LastSeenZKStore(FileStore<T> fsStore, ZooKeeperStoreBuilder<? extends ZooKeeperStore<T>> zooKeeperStoreBuilder, ZKPersistentConnection zkPersistentConnection, ScheduledExecutorService executorService, int warmUpTimeoutSeconds, int concurrentRequests) {
        this._executorService = executorService;
        this._warmUpTimeoutSeconds = warmUpTimeoutSeconds;
        this._concurrentRequests = concurrentRequests;
        this._fsStore = fsStore;
        this._zkToFsBus = new PropertyEventBusImpl(executorService);
        this._zkBusUpdaterSubscriber = new ZkBusUpdater();
        this._zkAwareStore = new ZooKeeperConnectionAwareStore(zooKeeperStoreBuilder, zkPersistentConnection);
        this._zkToFsBus.setPublisher(this._zkAwareStore);
    }

    @Override
    public void setBus(PropertyEventBus<T> bus) {
        if (!(bus instanceof PropertyEventBusImpl)) {
            LOG.warn("The bus used in LastSeenZKStore should be a PropertyEventBusImpl and not a " + bus.getClass().getName());
        }
        this._clientBus = bus;
    }

    @Override
    public void startPublishing(String prop) {
        this._executorService.submit(() -> {
            T valueInFileStore = this._fsStore.get(prop);
            if (valueInFileStore != null) {
                this._clientBus.publishInitialize(prop, valueInFileStore);
            } else {
                this._zkToFsBus.register(Collections.singleton(prop), this._zkBusUpdaterSubscriber);
            }
        });
    }

    @Override
    public void stopPublishing(String prop) {
        this._zkToFsBus.unregister(Collections.singleton(prop), this._zkBusUpdaterSubscriber);
        this._executorService.submit(() -> this._fsStore.remove(prop));
    }

    @Override
    public void start(final Callback<None> callback) {
        Callback<None> warmUpCallback = new Callback<None>(){

            public void onError(Throwable e) {
                callback.onError(e);
            }

            public void onSuccess(None result) {
                LastSeenZKStore.this.warmUp((Callback<None>)callback);
            }
        };
        this._zkAwareStore.start((Callback<None>)Callbacks.empty());
        this._fsStore.start(warmUpCallback);
    }

    private void warmUp(final Callback<None> callback) {
        TimeoutCallback timeoutCallback = new TimeoutCallback(this._executorService, (long)this._warmUpTimeoutSeconds, TimeUnit.SECONDS, (Callback)new Callback<None>(){

            public void onError(Throwable e) {
                LOG.info("EventBus Throttler didn't send all requests in time, continuing startup. The WarmUp will continue in background");
                callback.onSuccess((Object)None.none());
            }

            public void onSuccess(None result) {
                LOG.info("EventBus Throttler sent all requests");
                callback.onSuccess((Object)None.none());
            }
        }, "This message will never be used, even in case of timeout, no exception should be passed up");
        ArrayList<String> fileListWithoutExtension = new ArrayList<String>(this._fsStore.getAll().keySet());
        PropertyEventBusRequestsThrottler<T> throttler = new PropertyEventBusRequestsThrottler<T>(this._zkToFsBus, this._zkBusUpdaterSubscriber, fileListWithoutExtension, this._concurrentRequests, true);
        throttler.sendRequests((Callback<None>)timeoutCallback);
    }

    @Override
    public void shutdown(Callback<None> shutdown) {
        MultiCallback multiCallback = new MultiCallback(shutdown, 2);
        this._fsStore.shutdown((Callback<None>)multiCallback);
        this._zkAwareStore.shutdown((Callback<None>)multiCallback);
    }

    class ZkBusUpdater
    implements PropertyEventSubscriber<T> {
        ZkBusUpdater() {
        }

        void updateFsStore(String propertyName, T propertyValue) {
            if (propertyValue != null) {
                LastSeenZKStore.this._fsStore.put(propertyName, propertyValue);
            } else {
                LastSeenZKStore.this._fsStore.remove(propertyName);
            }
        }

        @Override
        public void onInitialize(String propertyName, T propertyValue) {
            this.updateFsStore(propertyName, propertyValue);
            LastSeenZKStore.this._clientBus.publishInitialize(propertyName, propertyValue);
        }

        @Override
        public void onAdd(String propertyName, T propertyValue) {
            this.updateFsStore(propertyName, propertyValue);
            LastSeenZKStore.this._clientBus.publishAdd(propertyName, propertyValue);
        }

        @Override
        public void onRemove(String propertyName) {
            LastSeenZKStore.this._fsStore.remove(propertyName);
            LastSeenZKStore.this._clientBus.publishRemove(propertyName);
        }
    }
}

