package org.apache.distributedlog.impl.federated;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.ZKNamespaceWatcher;
import org.apache.distributedlog.metadata.LogMetadataStore;
import org.apache.distributedlog.namespace.NamespaceWatcher;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.class */
public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable, FutureEventListener<Set<URI>> {
    private static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);
    private static final String ZNODE_SUB_NAMESPACES = ".subnamespaces";
    private static final String SUB_NAMESPACE_PREFIX = "NS_";
    final DistributedLogConfiguration conf;
    final URI namespace;
    final ZooKeeperClient zkc;
    final OrderedScheduler scheduler;
    final String zkSubnamespacesPath;
    final int maxLogsPerSubnamespace;
    final boolean forceCheckLogExistence;
    final AtomicBoolean duplicatedLogFound = new AtomicBoolean(false);
    final AtomicReference<String> duplicatedLogName = new AtomicReference<>(null);
    final AtomicReference<Integer> zkSubnamespacesVersion = new AtomicReference<>(null);
    final ConcurrentSkipListMap<URI, SubNamespace> subNamespaces = new ConcurrentSkipListMap<>();
    final ConcurrentMap<String, URI> log2Locations = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore$SubNamespace.class */
    public class SubNamespace implements NamespaceListener {
        final URI uri;
        final ZKNamespaceWatcher watcher;
        CompletableFuture<Set<String>> logsFuture = new CompletableFuture<>();

        SubNamespace(URI uri) {
            this.uri = uri;
            this.watcher = new ZKNamespaceWatcher(FederatedZKLogMetadataStore.this.conf, uri, FederatedZKLogMetadataStore.this.zkc, FederatedZKLogMetadataStore.this.scheduler);
            this.watcher.registerListener(this);
        }

        void watch() {
            this.watcher.watchNamespaceChanges();
        }

        synchronized CompletableFuture<Set<String>> getLogs() {
            return this.logsFuture;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v14, types: [java.util.Set] */
        @Override // org.apache.distributedlog.callback.NamespaceListener
        public void onStreamsChanged(Iterator<String> it) {
            CompletableFuture<Set<String>> completableFuture;
            HashSet<String> newHashSet = Sets.newHashSet(it);
            HashSet newHashSet2 = Sets.newHashSet();
            synchronized (this) {
                if (this.logsFuture.isDone()) {
                    try {
                        newHashSet2 = (Set) FutureUtils.result(this.logsFuture);
                    } catch (Exception e) {
                        FederatedZKLogMetadataStore.logger.error("Unexpected exception when getting logs from a satisified future of {} : ", this.uri, e);
                    }
                    this.logsFuture = new CompletableFuture<>();
                }
                for (String str : newHashSet) {
                    URI putIfAbsent = FederatedZKLogMetadataStore.this.log2Locations.putIfAbsent(str, this.uri);
                    if (null != putIfAbsent && !Objects.equal(this.uri, putIfAbsent)) {
                        FederatedZKLogMetadataStore.logger.error("Log {} is found duplicated in multiple locations : old location = {}, new location = {}", new Object[]{str, putIfAbsent, this.uri});
                        FederatedZKLogMetadataStore.this.duplicatedLogFound.set(true);
                    }
                }
                Iterator it2 = Sets.difference(newHashSet2, newHashSet).iterator();
                while (it2.hasNext()) {
                    FederatedZKLogMetadataStore.this.log2Locations.remove((String) it2.next(), this.uri);
                }
                completableFuture = this.logsFuture;
            }
            completableFuture.complete(newHashSet);
            FederatedZKLogMetadataStore.this.notifyOnNamespaceChanges();
        }
    }

    public static void createFederatedNamespace(URI uri, ZooKeeperClient zooKeeperClient) throws IOException, KeeperException {
        Utils.zkCreateFullPathOptimistic(zooKeeperClient, uri.getPath() + "/" + ZNODE_SUB_NAMESPACES, new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT);
    }

    public FederatedZKLogMetadataStore(DistributedLogConfiguration distributedLogConfiguration, URI uri, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler) throws IOException {
        this.conf = distributedLogConfiguration;
        this.namespace = uri;
        this.zkc = zooKeeperClient;
        this.scheduler = orderedScheduler;
        this.forceCheckLogExistence = distributedLogConfiguration.getFederatedCheckExistenceWhenCacheMiss();
        this.zkSubnamespacesPath = uri.getPath() + "/" + ZNODE_SUB_NAMESPACES;
        this.maxLogsPerSubnamespace = distributedLogConfiguration.getFederatedMaxLogsPerSubnamespace();
        try {
            for (URI uri2 : (Set) FutureUtils.result(fetchSubNamespaces(this))) {
                SubNamespace subNamespace = new SubNamespace(uri2);
                if (null == this.subNamespaces.putIfAbsent(uri2, subNamespace)) {
                    subNamespace.watch();
                    logger.info("Watched sub namespace {}", uri2);
                }
            }
            logger.info("Federated ZK LogMetadataStore is initialized for {}", uri);
        } catch (Exception e) {
            if (!(e instanceof IOException)) {
                throw new IOException(e);
            }
            throw ((IOException) e);
        }
    }

    private void scheduleTask(Runnable runnable, long j) {
        if (this.duplicatedLogFound.get()) {
            logger.error("Scheduler is halted for federated namespace {} as duplicated log found", this.namespace);
            return;
        }
        try {
            this.scheduler.schedule(runnable, j, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{runnable, Long.valueOf(j), e});
        }
    }

    private <T> CompletableFuture<T> postStateCheck(CompletableFuture<T> completableFuture) {
        final CompletableFuture<T> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((BiConsumer) new FutureEventListener<T>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.1
            public void onSuccess(T t) {
                if (FederatedZKLogMetadataStore.this.duplicatedLogFound.get()) {
                    completableFuture2.completeExceptionally(new UnexpectedException("Duplicate log found under " + FederatedZKLogMetadataStore.this.namespace));
                } else {
                    completableFuture2.complete(t);
                }
            }

            public void onFailure(Throwable th) {
                completableFuture2.completeExceptionally(th);
            }
        });
        return completableFuture2;
    }

    @VisibleForTesting
    Set<URI> getSubnamespaces() {
        return this.subNamespaces.keySet();
    }

    @VisibleForTesting
    void removeLogFromCache(String str) {
        this.log2Locations.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URI getSubNamespaceURI(String str) throws URISyntaxException {
        return new URI(this.namespace.getScheme(), this.namespace.getUserInfo(), this.namespace.getHost(), this.namespace.getPort(), this.namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + str, this.namespace.getQuery(), this.namespace.getFragment());
    }

    CompletableFuture<Set<URI>> getCachedSubNamespaces() {
        return FutureUtils.value(this.subNamespaces.keySet());
    }

    CompletableFuture<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
        final CompletableFuture<Set<URI>> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.2
                public void processResult(int i, String str, Object obj) {
                    if (KeeperException.Code.OK.intValue() == i) {
                        FederatedZKLogMetadataStore.this.fetchSubNamespaces(watcher, completableFuture);
                    } else {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i)));
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchSubNamespaces(Watcher watcher, final CompletableFuture<Set<URI>> completableFuture) {
        try {
            this.zkc.get().getChildren(this.zkSubnamespacesPath, watcher, new AsyncCallback.Children2Callback() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.3
                public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        completableFuture.completeExceptionally(new UnexpectedException("The subnamespaces don't exist for the federated namespace " + FederatedZKLogMetadataStore.this.namespace));
                        return;
                    }
                    if (KeeperException.Code.OK.intValue() == i) {
                        HashSet newHashSet = Sets.newHashSet();
                        newHashSet.add(FederatedZKLogMetadataStore.this.namespace);
                        try {
                            Iterator<String> it = list.iterator();
                            while (it.hasNext()) {
                                newHashSet.add(FederatedZKLogMetadataStore.this.getSubNamespaceURI(it.next()));
                            }
                            FederatedZKLogMetadataStore.this.setZkSubnamespacesVersion(stat.getVersion());
                            completableFuture.complete(newHashSet);
                        } catch (URISyntaxException e) {
                            FederatedZKLogMetadataStore.logger.error("Invalid sub namespace uri found : ", e);
                            completableFuture.completeExceptionally(new UnexpectedException("Invalid sub namespace uri found in " + FederatedZKLogMetadataStore.this.namespace, e));
                        }
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        fetchSubNamespaces(this).whenComplete((BiConsumer<? super Set<URI>, ? super Throwable>) this);
    }

    public void onSuccess(Set<URI> set) {
        for (URI uri : set) {
            if (!this.subNamespaces.containsKey(uri)) {
                SubNamespace subNamespace = new SubNamespace(uri);
                if (null == this.subNamespaces.putIfAbsent(uri, subNamespace)) {
                    subNamespace.watch();
                    logger.info("Watched new sub namespace {}.", uri);
                    notifyOnNamespaceChanges();
                }
            }
        }
    }

    public void onFailure(Throwable th) {
        scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
    }

    public void process(WatchedEvent watchedEvent) {
        if (Watcher.Event.EventType.None == watchedEvent.getType() && Watcher.Event.KeeperState.Expired == watchedEvent.getState()) {
            scheduleTask(this, this.conf.getZKSessionTimeoutMilliseconds());
        } else if (Watcher.Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
            fetchSubNamespaces(this).whenComplete((BiConsumer<? super Set<URI>, ? super Throwable>) this);
        }
    }

    private <T> CompletableFuture<T> duplicatedLogException(String str) {
        return FutureUtils.exception(new UnexpectedException("Duplicated log " + str + " found in namespace " + this.namespace));
    }

    @Override // org.apache.distributedlog.metadata.LogMetadataStore
    public CompletableFuture<URI> createLog(String str) {
        if (this.duplicatedLogFound.get()) {
            return duplicatedLogException(this.duplicatedLogName.get());
        }
        CompletableFuture<URI> completableFuture = new CompletableFuture<>();
        doCreateLog(str, completableFuture);
        return postStateCheck(completableFuture);
    }

    void doCreateLog(final String str, final CompletableFuture<URI> completableFuture) {
        getLogLocation(str).whenComplete((BiConsumer<? super Optional<URI>, ? super Throwable>) new FutureEventListener<Optional<URI>>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.4
            public void onSuccess(Optional<URI> optional) {
                if (optional.isPresent()) {
                    completableFuture.completeExceptionally(new LogExistsException("Log " + str + " already exists in " + optional.get()));
                } else {
                    FederatedZKLogMetadataStore.this.getCachedSubNamespacesAndCreateLog(str, completableFuture);
                }
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getCachedSubNamespacesAndCreateLog(final String str, final CompletableFuture<URI> completableFuture) {
        getCachedSubNamespaces().whenComplete((BiConsumer<? super Set<URI>, ? super Throwable>) new FutureEventListener<Set<URI>>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.5
            public void onSuccess(Set<URI> set) {
                FederatedZKLogMetadataStore.this.findSubNamespaceToCreateLog(str, set, completableFuture);
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchSubNamespacesAndCreateLog(final String str, final CompletableFuture<URI> completableFuture) {
        fetchSubNamespaces(null).whenComplete((BiConsumer<? super Set<URI>, ? super Throwable>) new FutureEventListener<Set<URI>>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.6
            public void onSuccess(Set<URI> set) {
                FederatedZKLogMetadataStore.this.findSubNamespaceToCreateLog(str, set, completableFuture);
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void findSubNamespaceToCreateLog(final String str, Set<URI> set, final CompletableFuture<URI> completableFuture) {
        final ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(set.size());
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(set.size());
        for (URI uri : set) {
            SubNamespace subNamespace = this.subNamespaces.get(uri);
            if (null == subNamespace) {
                completableFuture.completeExceptionally(new UnexpectedException("No sub namespace " + uri + " found"));
                return;
            } else {
                newArrayListWithExpectedSize2.add(subNamespace.getLogs());
                newArrayListWithExpectedSize.add(uri);
            }
        }
        FutureUtils.collect(newArrayListWithExpectedSize2).whenComplete((BiConsumer) new FutureEventListener<List<Set<String>>>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.7
            public void onSuccess(List<Set<String>> list) {
                for (int size = list.size() - 1; size >= 0; size--) {
                    if (list.get(size).size() < FederatedZKLogMetadataStore.this.maxLogsPerSubnamespace) {
                        FederatedZKLogMetadataStore.this.createLogInNamespace((URI) newArrayListWithExpectedSize.get(size), str, completableFuture);
                        return;
                    }
                }
                FederatedZKLogMetadataStore.this.createSubNamespace().whenComplete((BiConsumer<? super URI, ? super Throwable>) new FutureEventListener<URI>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.7.1
                    public void onSuccess(URI uri2) {
                        FederatedZKLogMetadataStore.this.createLogInNamespace(uri2, str, completableFuture);
                    }

                    public void onFailure(Throwable th) {
                        completableFuture.completeExceptionally(th);
                    }
                });
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getNamespaceFromZkPath(String str) throws UnexpectedException {
        String[] split = str.split(SUB_NAMESPACE_PREFIX);
        if (split.length <= 0) {
            throw new UnexpectedException("Invalid namespace @ " + str);
        }
        return SUB_NAMESPACE_PREFIX + split[split.length - 1];
    }

    CompletableFuture<URI> createSubNamespace() {
        final CompletableFuture<URI> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.get().create(this.namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.8
                public void processResult(int i, String str, Object obj, String str2) {
                    if (KeeperException.Code.OK.intValue() != i) {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i)));
                        return;
                    }
                    try {
                        URI subNamespaceURI = FederatedZKLogMetadataStore.this.getSubNamespaceURI(FederatedZKLogMetadataStore.this.getNamespaceFromZkPath(str2));
                        FederatedZKLogMetadataStore.logger.info("Created sub namespace {}", subNamespaceURI);
                        completableFuture.complete(subNamespaceURI);
                    } catch (UnexpectedException e) {
                        completableFuture.completeExceptionally(e);
                    } catch (URISyntaxException e2) {
                        completableFuture.completeExceptionally(new UnexpectedException("Invalid namespace " + str2 + " is created."));
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createLogInNamespace(final URI uri, final String str, final CompletableFuture<URI> completableFuture) {
        this.scheduler.submit(new Runnable() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.9
            @Override // java.lang.Runnable
            public void run() {
                try {
                    FederatedZKLogMetadataStore.this.createLogInNamespaceSync(uri, str);
                    completableFuture.complete(uri);
                } catch (KeeperException e) {
                    completableFuture.completeExceptionally(e);
                } catch (IOException e2) {
                    completableFuture.completeExceptionally(e2);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    completableFuture.completeExceptionally(e3);
                } catch (KeeperException.BadVersionException e4) {
                    FederatedZKLogMetadataStore.this.fetchSubNamespacesAndCreateLog(str, completableFuture);
                }
            }
        });
    }

    void createLogInNamespaceSync(URI uri, String str) throws InterruptedException, IOException, KeeperException {
        Transaction transaction = this.zkc.get().transaction();
        int intValue = null == this.zkSubnamespacesVersion.get() ? 0 : this.zkSubnamespacesVersion.get().intValue();
        transaction.setData(this.zkSubnamespacesPath, uri.getPath().getBytes(Charsets.UTF_8), intValue);
        transaction.create(uri.getPath() + "/" + str, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT);
        try {
            transaction.commit();
            setZkSubnamespacesVersion(intValue + 1);
        } catch (KeeperException e) {
            List results = e.getResults();
            OpResult.ErrorResult errorResult = (OpResult) results.get(1);
            if (errorResult instanceof OpResult.ErrorResult) {
                if (KeeperException.Code.NODEEXISTS.intValue() == errorResult.getErr()) {
                    throw new LogExistsException("Log " + str + " already exists");
                }
            }
            OpResult.ErrorResult errorResult2 = (OpResult) results.get(0);
            if (errorResult2 instanceof OpResult.ErrorResult) {
                if (KeeperException.Code.BADVERSION.intValue() == errorResult2.getErr()) {
                    throw KeeperException.create(KeeperException.Code.BADVERSION);
                }
            }
            throw new ZKException("ZK exception in creating log " + str + " in " + uri, e);
        }
    }

    void setZkSubnamespacesVersion(int i) {
        boolean z = false;
        while (!z) {
            Integer num = this.zkSubnamespacesVersion.get();
            z = null == num ? this.zkSubnamespacesVersion.compareAndSet(null, Integer.valueOf(i)) : num.intValue() < i ? this.zkSubnamespacesVersion.compareAndSet(num, Integer.valueOf(i)) : true;
        }
    }

    @Override // org.apache.distributedlog.metadata.LogMetadataStore
    public CompletableFuture<Optional<URI>> getLogLocation(String str) {
        if (this.duplicatedLogFound.get()) {
            return duplicatedLogException(this.duplicatedLogName.get());
        }
        URI uri = this.log2Locations.get(str);
        return null != uri ? postStateCheck(FutureUtils.value(Optional.of(uri))) : !this.forceCheckLogExistence ? FutureUtils.value(Optional.absent()) : postStateCheck(fetchLogLocation(str).thenApply(optional -> {
            if (optional.isPresent()) {
                this.log2Locations.putIfAbsent(str, optional.get());
            }
            return optional;
        }));
    }

    private CompletableFuture<Optional<URI>> fetchLogLocation(final String str) {
        final CompletableFuture<Optional<URI>> completableFuture = new CompletableFuture<>();
        NavigableSet<URI> keySet = this.subNamespaces.keySet();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(keySet.size());
        Iterator<URI> it = keySet.iterator();
        while (it.hasNext()) {
            newArrayListWithExpectedSize.add(fetchLogLocation(it.next(), str));
        }
        FutureUtils.collect(newArrayListWithExpectedSize).whenComplete((BiConsumer) new FutureEventListener<List<Optional<URI>>>() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.10
            public void onSuccess(List<Optional<URI>> list) {
                Optional<URI> absent = Optional.absent();
                for (Optional<URI> optional : list) {
                    if (!absent.isPresent()) {
                        absent = optional;
                    } else if (optional.isPresent()) {
                        FederatedZKLogMetadataStore.logger.error("Log {} is found in multiple sub namespaces : {} & {}.", new Object[]{str, absent.get(), optional.get()});
                        FederatedZKLogMetadataStore.this.duplicatedLogName.compareAndSet(null, str);
                        FederatedZKLogMetadataStore.this.duplicatedLogFound.set(true);
                        completableFuture.completeExceptionally(new UnexpectedException("Log " + str + " is found in multiple sub namespaces : " + absent.get() + " & " + optional.get()));
                        return;
                    }
                }
                completableFuture.complete(absent);
            }

            public void onFailure(Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Optional<URI>> fetchLogLocation(final URI uri, String str) {
        final CompletableFuture<Optional<URI>> completableFuture = new CompletableFuture<>();
        try {
            this.zkc.get().exists(uri.getPath() + "/" + str, false, new AsyncCallback.StatCallback() { // from class: org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore.11
                public void processResult(int i, String str2, Object obj, Stat stat) {
                    if (KeeperException.Code.OK.intValue() == i) {
                        completableFuture.complete(Optional.of(uri));
                    } else if (KeeperException.Code.NONODE.intValue() == i) {
                        completableFuture.complete(Optional.absent());
                    } else {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i)));
                    }
                }
            }, (Object) null);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            completableFuture.completeExceptionally(e);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e2) {
            completableFuture.completeExceptionally(e2);
        }
        return completableFuture;
    }

    @Override // org.apache.distributedlog.metadata.LogMetadataStore
    public CompletableFuture<Iterator<String>> getLogs(String str) {
        return !"".equals(str) ? FutureUtils.exception(new UnexpectedException("Get logs by prefix is not supported by federated metadata store")) : this.duplicatedLogFound.get() ? duplicatedLogException(this.duplicatedLogName.get()) : postStateCheck(retrieveLogs().thenApply(list -> {
            return getIterator(list);
        }));
    }

    private CompletableFuture<List<Set<String>>> retrieveLogs() {
        Collection<SubNamespace> values = this.subNamespaces.values();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(values.size());
        Iterator<SubNamespace> it = values.iterator();
        while (it.hasNext()) {
            newArrayListWithExpectedSize.add(it.next().getLogs());
        }
        return FutureUtils.collect(newArrayListWithExpectedSize);
    }

    private Iterator<String> getIterator(List<Set<String>> list) {
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
        Iterator<Set<String>> it = list.iterator();
        while (it.hasNext()) {
            newArrayListWithExpectedSize.add(it.next().iterator());
        }
        return Iterators.concat(newArrayListWithExpectedSize.iterator());
    }

    @Override // org.apache.distributedlog.metadata.LogMetadataStore
    public void registerNamespaceListener(NamespaceListener namespaceListener) {
        registerListener(namespaceListener);
    }

    @Override // org.apache.distributedlog.namespace.NamespaceWatcher
    protected void watchNamespaceChanges() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOnNamespaceChanges() {
        retrieveLogs().thenAccept(list -> {
            Iterator<NamespaceListener> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().onStreamsChanged(getIterator(list));
            }
        });
    }
}
