package org.apache.distributedlog.impl;

import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.namespace.NamespaceWatcher;
import org.apache.distributedlog.util.DLUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/ZKNamespaceWatcher.class */
public class ZKNamespaceWatcher extends NamespaceWatcher implements Runnable, Watcher, AsyncCallback.Children2Callback {
    private static final Logger logger = LoggerFactory.getLogger(ZKNamespaceWatcher.class);
    private final DistributedLogConfiguration conf;
    private final URI uri;
    private final ZooKeeperClient zkc;
    private final OrderedScheduler scheduler;
    private final AtomicBoolean namespaceWatcherSet = new AtomicBoolean(false);

    public ZKNamespaceWatcher(DistributedLogConfiguration distributedLogConfiguration, URI uri, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler) {
        this.conf = distributedLogConfiguration;
        this.uri = uri;
        this.zkc = zooKeeperClient;
        this.scheduler = orderedScheduler;
    }

    private void scheduleTask(Runnable runnable, long j) {
        try {
            this.scheduler.schedule(runnable, j, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{runnable, Long.valueOf(j), e});
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            doWatchNamespaceChanges();
        } catch (Exception e) {
            logger.error("Encountered unknown exception on watching namespace {} ", this.uri, e);
        }
    }

    @Override // org.apache.distributedlog.namespace.NamespaceWatcher
    public void watchNamespaceChanges() {
        if (this.namespaceWatcherSet.compareAndSet(false, true)) {
            doWatchNamespaceChanges();
        }
    }

    private void doWatchNamespaceChanges() {
        try {
            this.zkc.get().getChildren(this.uri.getPath(), this, this, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on watching namespace changes for {} : ", this.uri, e);
            scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
        }
    }

    public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
        if (KeeperException.Code.OK.intValue() != i) {
            scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
            return;
        }
        logger.info("Received updated logs under {} : {}", this.uri, list);
        ArrayList arrayList = new ArrayList(list.size());
        for (String str2 : list) {
            if (!DLUtils.isReservedStreamName(str2)) {
                arrayList.add(str2);
            }
        }
        Iterator<NamespaceListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onStreamsChanged(arrayList.iterator());
        }
    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Watcher.Event.EventType.None) {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
            }
        } else if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            doWatchNamespaceChanges();
        }
    }
}
