/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.distributedlog.impl.acl;

import dlshade.com.google.common.collect.ImmutableSet;
import dlshade.com.google.common.collect.Sets;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.util.ZkUtils;
import dlshade.org.apache.distributedlog.DistributedLogConfiguration;
import dlshade.org.apache.distributedlog.ZooKeeperClient;
import dlshade.org.apache.distributedlog.acl.AccessControlManager;
import dlshade.org.apache.distributedlog.exceptions.DLInterruptedException;
import dlshade.org.apache.distributedlog.impl.acl.ZKAccessControl;
import dlshade.org.apache.distributedlog.thrift.AccessControlEntry;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.WatchedEvent;
import dlshade.org.apache.zookeeper.Watcher;
import dlshade.org.apache.zookeeper.ZooKeeper;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKAccessControlManager
implements AccessControlManager,
Watcher {
    private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class);
    private static final int ZK_RETRY_BACKOFF_MS = 500;
    protected final DistributedLogConfiguration conf;
    protected final ZooKeeperClient zkc;
    protected final String zkRootPath;
    protected final ScheduledExecutorService scheduledExecutorService;
    protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
    protected ZKAccessControl defaultAccessControl;
    protected volatile boolean closed = false;

    public ZKAccessControlManager(DistributedLogConfiguration conf, ZooKeeperClient zkc, String zkRootPath, ScheduledExecutorService scheduledExecutorService) throws IOException {
        this.conf = conf;
        this.zkc = zkc;
        this.zkRootPath = zkRootPath;
        this.scheduledExecutorService = scheduledExecutorService;
        this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
        try {
            FutureUtils.result(this.fetchDefaultAccessControlEntry());
        }
        catch (Throwable t) {
            if (t instanceof InterruptedException) {
                throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
            }
            if (t instanceof KeeperException) {
                throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t);
            }
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
        }
        try {
            FutureUtils.result(this.fetchAccessControlEntries());
        }
        catch (Throwable t) {
            if (t instanceof InterruptedException) {
                throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
            }
            if (t instanceof KeeperException) {
                throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t);
            }
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
        }
    }

    protected AccessControlEntry getAccessControlEntry(String stream) {
        ZKAccessControl entry = (ZKAccessControl)this.streamEntries.get(stream);
        entry = null == entry ? this.defaultAccessControl : entry;
        return entry.getAccessControlEntry();
    }

    @Override
    public boolean allowWrite(String stream) {
        return !this.getAccessControlEntry(stream).isDenyWrite();
    }

    @Override
    public boolean allowTruncate(String stream) {
        return !this.getAccessControlEntry(stream).isDenyTruncate();
    }

    @Override
    public boolean allowDelete(String stream) {
        return !this.getAccessControlEntry(stream).isDenyDelete();
    }

    @Override
    public boolean allowAcquire(String stream) {
        return !this.getAccessControlEntry(stream).isDenyAcquire();
    }

    @Override
    public boolean allowRelease(String stream) {
        return !this.getAccessControlEntry(stream).isDenyRelease();
    }

    @Override
    public void close() {
        this.closed = true;
    }

    private CompletableFuture<Void> fetchAccessControlEntries() {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.fetchAccessControlEntries(promise);
        return promise;
    }

    private void fetchAccessControlEntries(final CompletableFuture<Void> promise) {
        try {
            this.zkc.get().getChildren(this.zkRootPath, (Watcher)this, new AsyncCallback.Children2Callback(){

                @Override
                public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                    if (KeeperException.Code.OK.intValue() != rc) {
                        promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                        return;
                    }
                    HashSet<String> streamsReceived = new HashSet<String>();
                    streamsReceived.addAll(children);
                    Set streamsCached = ZKAccessControlManager.this.streamEntries.keySet();
                    ImmutableSet streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy();
                    for (String s : streamsRemoved) {
                        ZKAccessControl accessControl = (ZKAccessControl)ZKAccessControlManager.this.streamEntries.remove(s);
                        if (null == accessControl) continue;
                        logger.info("Removed Access Control Entry for stream {} : {}", (Object)s, (Object)accessControl.getAccessControlEntry());
                    }
                    if (streamsReceived.isEmpty()) {
                        promise.complete(null);
                        return;
                    }
                    final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
                    final AtomicInteger numFailures = new AtomicInteger(0);
                    Iterator iterator = streamsReceived.iterator();
                    while (iterator.hasNext()) {
                        String s;
                        final String streamName = s = (String)iterator.next();
                        ZKAccessControl.read(ZKAccessControlManager.this.zkc, ZKAccessControlManager.this.zkRootPath + "/" + streamName, null).whenComplete(new FutureEventListener<ZKAccessControl>(){

                            @Override
                            public void onSuccess(ZKAccessControl accessControl) {
                                ZKAccessControlManager.this.streamEntries.put(streamName, accessControl);
                                logger.info("Added overrided access control for stream {} : {}", (Object)streamName, (Object)accessControl.getAccessControlEntry());
                                this.complete();
                            }

                            @Override
                            public void onFailure(Throwable cause) {
                                if (cause instanceof KeeperException.NoNodeException) {
                                    ZKAccessControlManager.this.streamEntries.remove(streamName);
                                } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
                                    logger.warn("Access control is corrupted for stream {} @ {},skipped it ...", new Object[]{streamName, ZKAccessControlManager.this.zkRootPath, cause});
                                    ZKAccessControlManager.this.streamEntries.remove(streamName);
                                } else if (1 == numFailures.incrementAndGet()) {
                                    promise.completeExceptionally(cause);
                                }
                                this.complete();
                            }

                            private void complete() {
                                if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
                                    promise.complete(null);
                                }
                            }
                        });
                    }
                }
            }, null);
        }
        catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.completeExceptionally(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            promise.completeExceptionally(e);
        }
    }

    private CompletableFuture<ZKAccessControl> fetchDefaultAccessControlEntry() {
        CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
        this.fetchDefaultAccessControlEntry(promise);
        return promise;
    }

    private void fetchDefaultAccessControlEntry(final CompletableFuture<ZKAccessControl> promise) {
        ZKAccessControl.read(this.zkc, this.zkRootPath, this).whenComplete(new FutureEventListener<ZKAccessControl>(){

            @Override
            public void onSuccess(ZKAccessControl accessControl) {
                logger.info("Default Access Control will be changed from {} to {}", (Object)ZKAccessControlManager.this.defaultAccessControl, (Object)accessControl);
                ZKAccessControlManager.this.defaultAccessControl = accessControl;
                promise.complete(accessControl);
            }

            @Override
            public void onFailure(Throwable cause) {
                if (cause instanceof KeeperException.NoNodeException) {
                    logger.info("Default Access Control is missing, creating one for {} ...", (Object)ZKAccessControlManager.this.zkRootPath);
                    ZKAccessControlManager.this.createDefaultAccessControlEntryIfNeeded(promise);
                } else {
                    promise.completeExceptionally(cause);
                }
            }
        });
    }

    private void createDefaultAccessControlEntryIfNeeded(final CompletableFuture<ZKAccessControl> promise) {
        ZooKeeper zk;
        try {
            zk = this.zkc.get();
        }
        catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.completeExceptionally(e);
            return;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            promise.completeExceptionally(e);
            return;
        }
        ZkUtils.asyncCreateFullPathOptimistic(zk, this.zkRootPath, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT, new AsyncCallback.StringCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                if (KeeperException.Code.OK.intValue() == rc) {
                    logger.info("Created zk path {} for default ACL.", (Object)ZKAccessControlManager.this.zkRootPath);
                    ZKAccessControlManager.this.fetchDefaultAccessControlEntry(promise);
                } else {
                    promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                }
            }
        }, null);
    }

    private void refetchDefaultAccessControlEntry(int delayMs) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ZKAccessControlManager.this.fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>(){

                    @Override
                    public void onSuccess(ZKAccessControl value) {
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
                            logger.warn("Default access control entry is corrupted, ignore this update : ", cause);
                            return;
                        }
                        logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ", (Object)500, (Object)cause);
                        ZKAccessControlManager.this.refetchDefaultAccessControlEntry(500);
                    }
                });
            }
        }, (long)delayMs, TimeUnit.MILLISECONDS);
    }

    private void refetchAccessControlEntries(int delayMs) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ZKAccessControlManager.this.fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>(){

                    @Override
                    public void onSuccess(Void value) {
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ", (Object)500, (Object)cause);
                        ZKAccessControlManager.this.refetchAccessControlEntries(500);
                    }
                });
            }
        }, (long)delayMs, TimeUnit.MILLISECONDS);
    }

    private void refetchAllAccessControlEntries(int delayMs) {
        if (this.closed) {
            return;
        }
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                ZKAccessControlManager.this.fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>(){

                    @Override
                    public void onSuccess(ZKAccessControl value) {
                        ZKAccessControlManager.this.fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>(){

                            @Override
                            public void onSuccess(Void value) {
                            }

                            @Override
                            public void onFailure(Throwable cause) {
                                logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ", (Object)500, (Object)cause);
                                ZKAccessControlManager.this.refetchAccessControlEntries(500);
                            }
                        });
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ", (Object)500, (Object)cause);
                        ZKAccessControlManager.this.refetchAllAccessControlEntries(500);
                    }
                });
            }
        }, (long)delayMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public void process(WatchedEvent event) {
        if (Watcher.Event.EventType.None.equals((Object)event.getType())) {
            if (event.getState() == Watcher.Event.KeeperState.Expired) {
                this.refetchAllAccessControlEntries(0);
            }
        } else if (Watcher.Event.EventType.NodeDataChanged.equals((Object)event.getType())) {
            logger.info("Default ACL for {} is changed, refetching ...", (Object)this.zkRootPath);
            this.refetchDefaultAccessControlEntry(0);
        } else if (Watcher.Event.EventType.NodeChildrenChanged.equals((Object)event.getType())) {
            logger.info("List of ACLs for {} are changed, refetching ...", (Object)this.zkRootPath);
            this.refetchAccessControlEntries(0);
        }
    }
}

