/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.auditor;

import dlshade.com.google.common.base.Objects;
import dlshade.com.google.common.base.Preconditions;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.AsyncCallback;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.BookKeeper;
import dlshade.org.apache.bookkeeper.client.BookKeeperAccessor;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.meta.LedgerManager;
import dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import dlshade.org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import dlshade.org.apache.commons.lang3.tuple.Pair;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.DLUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DLAuditor {
    private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class);
    private final DistributedLogConfiguration conf;

    public DLAuditor(DistributedLogConfiguration conf) {
        this.conf = conf;
    }

    private ZooKeeperClient getZooKeeperClient(Namespace namespace) {
        NamespaceDriver driver = namespace.getNamespaceDriver();
        assert (driver instanceof BKNamespaceDriver);
        return ((BKNamespaceDriver)driver).getWriterZKC();
    }

    private BookKeeperClient getBookKeeperClient(Namespace namespace) {
        NamespaceDriver driver = namespace.getNamespaceDriver();
        assert (driver instanceof BKNamespaceDriver);
        return ((BKNamespaceDriver)driver).getReaderBKC();
    }

    private String validateAndGetZKServers(List<URI> uris) {
        URI firstURI = uris.get(0);
        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI);
        for (URI uri : uris) {
            if (zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) continue;
            throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster");
        }
        return zkServers;
    }

    private BKDLConfig resolveBKDLConfig(ZooKeeperClient zkc, List<URI> uris) throws IOException {
        URI firstURI = uris.get(0);
        BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, firstURI);
        for (URI uri : uris) {
            BKDLConfig anotherConfig = BKDLConfig.resolveDLConfig(zkc, uri);
            if (Objects.equal(bkdlConfig.getBkLedgersPath(), anotherConfig.getBkLedgersPath()) && Objects.equal(bkdlConfig.getBkZkServersForWriter(), anotherConfig.getBkZkServersForWriter())) continue;
            throw new IllegalArgumentException("Uris don't use same bookkeeper cluster");
        }
        return bkdlConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> uris, List<List<String>> allocationPaths) throws IOException {
        Preconditions.checkArgument(uris.size() > 0, "No uri provided to audit");
        String zkServers = this.validateAndGetZKServers(uris);
        BoundExponentialBackoffRetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(this.conf.getZKRetryBackoffStartMillis(), this.conf.getZKRetryBackoffMaxMillis(), Integer.MAX_VALUE);
        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().name("DLAuditor-ZK").zkServers(zkServers).sessionTimeoutMs(this.conf.getZKSessionTimeoutMilliseconds()).retryPolicy(retryPolicy).zkAclId(this.conf.getZkAclId()).build();
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            BKDLConfig bkdlConfig = this.resolveBKDLConfig(zkc, uris);
            logger.info("Resolved bookkeeper config : {}", (Object)bkdlConfig);
            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder().name("DLAuditor-BK").dlConfig(this.conf).zkServers(bkdlConfig.getBkZkServersForWriter()).ledgersPath(bkdlConfig.getBkLedgersPath()).build();
            try {
                Set<Long> bkLedgers = this.collectLedgersFromBK(bkc, executorService);
                Set<Long> dlLedgers = this.collectLedgersFromDL(uris, allocationPaths);
                Pair<Set<Long>, Set<Long>> pair = Pair.of(bkLedgers, dlLedgers);
                bkc.close();
                return pair;
            }
            catch (Throwable throwable) {
                bkc.close();
                throw throwable;
            }
        }
        finally {
            zkc.close();
            executorService.shutdown();
        }
    }

    private Set<Long> collectLedgersFromBK(BookKeeperClient bkc, final ExecutorService executorService) throws IOException {
        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
        final HashSet<Long> ledgers = new HashSet<Long>();
        final CompletableFuture doneFuture = FutureUtils.createFuture();
        BookkeeperInternalCallbacks.Processor<Long> collector = new BookkeeperInternalCallbacks.Processor<Long>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void process(Long lid, final AsyncCallback.VoidCallback cb) {
                Set set = ledgers;
                synchronized (set) {
                    ledgers.add(lid);
                    if (0 == ledgers.size() % 1000) {
                        logger.info("Collected {} ledgers", (Object)ledgers.size());
                    }
                }
                executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        cb.processResult(0, null, null);
                    }
                });
            }
        };
        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx) {
                if (0 == rc) {
                    doneFuture.complete(null);
                } else {
                    doneFuture.completeExceptionally(BKException.create(rc));
                }
            }
        };
        lm.asyncProcessLedgers(collector, finalCb, null, 0, -9);
        try {
            doneFuture.get();
            logger.info("Collected total {} ledgers", (Object)ledgers.size());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on collecting ledgers : ", (Throwable)e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new IOException("Failed to collect ledgers : ", e.getCause());
        }
        return ledgers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths) throws IOException {
        final TreeSet<Long> ledgers = new TreeSet<Long>();
        ArrayList<Namespace> namespaces = new ArrayList<Namespace>(uris.size());
        try {
            for (URI uri : uris) {
                namespaces.add(NamespaceBuilder.newBuilder().conf(this.conf).uri(uri).build());
            }
            final CountDownLatch doneLatch = new CountDownLatch(uris.size());
            final AtomicInteger numFailures = new AtomicInteger(0);
            ExecutorService executor = Executors.newFixedThreadPool(uris.size());
            try {
                int i = 0;
                Iterator iterator = namespaces.iterator();
                while (iterator.hasNext()) {
                    Namespace namespace;
                    Namespace dlNamespace = namespace = (Namespace)iterator.next();
                    final URI uri = uris.get(i);
                    final List<String> aps = allocationPaths.get(i);
                    ++i;
                    executor.submit(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                logger.info("Collecting ledgers from {} : {}", (Object)uri, (Object)aps);
                                DLAuditor.this.collectLedgersFromAllocator(uri, namespace, aps, ledgers);
                                Set set = ledgers;
                                synchronized (set) {
                                    logger.info("Collected {} ledgers from allocators for {} : {} ", new Object[]{ledgers.size(), uri, ledgers});
                                }
                                DLAuditor.this.collectLedgersFromDL(uri, namespace, ledgers);
                            }
                            catch (IOException e) {
                                numFailures.incrementAndGet();
                                logger.info("Error to collect ledgers from DL : ", (Throwable)e);
                            }
                            doneLatch.countDown();
                        }
                    });
                }
                try {
                    doneLatch.await();
                    if (numFailures.get() > 0) {
                        throw new IOException(numFailures.get() + " errors to collect ledgers from DL");
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    logger.warn("Interrupted on collecting ledgers from DL : ", (Throwable)e);
                    throw new DLInterruptedException("Interrupted on collecting ledgers from DL : ", (Throwable)e);
                }
            }
            finally {
                executor.shutdown();
            }
        }
        finally {
            for (Namespace namespace : namespaces) {
                namespace.close();
            }
        }
        return ledgers;
    }

    private void collectLedgersFromAllocator(URI uri, final Namespace namespace, List<String> allocationPaths, final Set<Long> ledgers) throws IOException {
        LinkedBlockingQueue<String> poolQueue = new LinkedBlockingQueue<String>();
        for (String allocationPath : allocationPaths) {
            String rootPath = uri.getPath() + "/" + allocationPath;
            try {
                List<String> pools = this.getZooKeeperClient(namespace).get().getChildren(rootPath, false);
                for (String pool : pools) {
                    poolQueue.add(rootPath + "/" + pool);
                }
            }
            catch (KeeperException e) {
                throw new ZKException("Failed to get list of pools from " + rootPath, e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DLInterruptedException("Interrupted on getting list of pools from " + rootPath, (Throwable)e);
            }
        }
        logger.info("Collecting ledgers from allocators for {} : {}", (Object)uri, poolQueue);
        DLAuditor.executeAction(poolQueue, 10, new Action<String>(){

            @Override
            public void execute(String poolPath) throws IOException {
                try {
                    this.collectLedgersFromPool(poolPath);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new DLInterruptedException("Interrupted on collecting ledgers from allocation pool " + poolPath, (Throwable)e);
                }
                catch (KeeperException e) {
                    throw new ZKException("Failed to collect ledgers from allocation pool " + poolPath, e.code());
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void collectLedgersFromPool(String poolPath) throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
                List<String> allocators = DLAuditor.this.getZooKeeperClient(namespace).get().getChildren(poolPath, false);
                for (String allocator : allocators) {
                    String allocatorPath = poolPath + "/" + allocator;
                    byte[] data = DLAuditor.this.getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat());
                    if (null == data || data.length <= 0) continue;
                    try {
                        long ledgerId = DLUtils.bytes2LogSegmentId(data);
                        Set set = ledgers;
                        synchronized (set) {
                            ledgers.add(ledgerId);
                        }
                    }
                    catch (NumberFormatException nfe) {
                        logger.warn("Invalid ledger found in allocator path {} : ", (Object)allocatorPath, (Object)nfe);
                    }
                }
            }
        });
        logger.info("Collected ledgers from allocators for {}.", (Object)uri);
    }

    private void collectLedgersFromDL(URI uri, final Namespace namespace, final Set<Long> ledgers) throws IOException {
        logger.info("Enumerating {} to collect streams.", (Object)uri);
        Iterator<String> streams = namespace.getLogs();
        LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
        while (streams.hasNext()) {
            streamQueue.add(streams.next());
        }
        logger.info("Collected {} streams from uri {} : {}", new Object[]{streamQueue.size(), uri, streams});
        DLAuditor.executeAction(streamQueue, 10, new Action<String>(){

            @Override
            public void execute(String stream) throws IOException {
                DLAuditor.this.collectLedgersFromStream(namespace, stream, ledgers);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Long> collectLedgersFromStream(Namespace namespace, String stream, Set<Long> ledgers) throws IOException {
        try (DistributedLogManager dlm = namespace.openLog(stream);){
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            ArrayList<Long> sLedgers = new ArrayList<Long>();
            for (LogSegmentMetadata segment : segments) {
                Set<Long> set = ledgers;
                synchronized (set) {
                    ledgers.add(segment.getLogSegmentId());
                }
                sLedgers.add(segment.getLogSegmentId());
            }
            ArrayList<Long> arrayList = sLedgers;
            return arrayList;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Long> calculateStreamSpaceUsage(URI uri) throws IOException {
        logger.info("Collecting stream space usage for {}.", (Object)uri);
        try (Namespace namespace = NamespaceBuilder.newBuilder().conf(this.conf).uri(uri).build();){
            Map<String, Long> map = this.calculateStreamSpaceUsage(uri, namespace);
            return map;
        }
    }

    private Map<String, Long> calculateStreamSpaceUsage(final URI uri, final Namespace namespace) throws IOException {
        Iterator<String> streams = namespace.getLogs();
        LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
        while (streams.hasNext()) {
            streamQueue.add(streams.next());
        }
        final ConcurrentSkipListMap<String, Long> streamSpaceUsageMap = new ConcurrentSkipListMap<String, Long>();
        final AtomicInteger numStreamsCollected = new AtomicInteger(0);
        DLAuditor.executeAction(streamQueue, 10, new Action<String>(){

            @Override
            public void execute(String stream) throws IOException {
                streamSpaceUsageMap.put(stream, DLAuditor.this.calculateStreamSpaceUsage(namespace, stream));
                if (numStreamsCollected.incrementAndGet() % 1000 == 0) {
                    logger.info("Calculated {} streams from uri {}.", (Object)numStreamsCollected.get(), (Object)uri);
                }
            }
        });
        return streamSpaceUsageMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private long calculateStreamSpaceUsage(Namespace namespace, String stream) throws IOException {
        long totalBytes = 0L;
        try (DistributedLogManager dlm = namespace.openLog(stream);){
            List<LogSegmentMetadata> segments = dlm.getLogSegments();
            for (LogSegmentMetadata segment : segments) {
                try {
                    LedgerHandle lh = this.getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, this.conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
                    totalBytes += lh.getLength();
                    lh.close();
                }
                catch (BKException e) {
                    logger.error("Failed to open ledger {} : ", (Object)segment.getLogSegmentId(), (Object)e);
                    throw new IOException("Failed to open ledger " + segment.getLogSegmentId(), e);
                }
                catch (InterruptedException e) {
                    logger.warn("Interrupted on opening ledger {} : ", (Object)segment.getLogSegmentId(), (Object)e);
                    Thread.currentThread().interrupt();
                    throw new DLInterruptedException("Interrupted on opening ledger " + segment.getLogSegmentId(), (Throwable)e);
                    return totalBytes;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long calculateLedgerSpaceUsage(URI uri) throws IOException {
        ArrayList<URI> uris = Lists.newArrayList(uri);
        String zkServers = this.validateAndGetZKServers(uris);
        BoundExponentialBackoffRetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(this.conf.getZKRetryBackoffStartMillis(), this.conf.getZKRetryBackoffMaxMillis(), Integer.MAX_VALUE);
        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().name("DLAuditor-ZK").zkServers(zkServers).sessionTimeoutMs(this.conf.getZKSessionTimeoutMilliseconds()).retryPolicy(retryPolicy).zkAclId(this.conf.getZkAclId()).build();
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            BKDLConfig bkdlConfig = this.resolveBKDLConfig(zkc, uris);
            logger.info("Resolved bookkeeper config : {}", (Object)bkdlConfig);
            BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder().name("DLAuditor-BK").dlConfig(this.conf).zkServers(bkdlConfig.getBkZkServersForWriter()).ledgersPath(bkdlConfig.getBkLedgersPath()).build();
            try {
                long l = this.calculateLedgerSpaceUsage(bkc, executorService);
                bkc.close();
                return l;
            }
            catch (Throwable throwable) {
                bkc.close();
                throw throwable;
            }
        }
        finally {
            zkc.close();
            executorService.shutdown();
        }
    }

    private long calculateLedgerSpaceUsage(BookKeeperClient bkc, final ExecutorService executorService) throws IOException {
        final AtomicLong totalBytes = new AtomicLong(0L);
        final AtomicLong totalEntries = new AtomicLong(0L);
        final AtomicLong numLedgers = new AtomicLong(0L);
        LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
        final CompletableFuture doneFuture = FutureUtils.createFuture();
        final BookKeeper bk = bkc.get();
        BookkeeperInternalCallbacks.Processor<Long> collector = new BookkeeperInternalCallbacks.Processor<Long>(){

            @Override
            public void process(final Long lid, final AsyncCallback.VoidCallback cb) {
                numLedgers.incrementAndGet();
                executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        bk.asyncOpenLedgerNoRecovery(lid, BookKeeper.DigestType.CRC32, DLAuditor.this.conf.getBKDigestPW().getBytes(StandardCharsets.UTF_8), new AsyncCallback.OpenCallback(){

                            @Override
                            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
                                int cbRc;
                                if (0 == rc) {
                                    totalBytes.addAndGet(lh.getLength());
                                    totalEntries.addAndGet(lh.getLastAddConfirmed() + 1L);
                                    cbRc = rc;
                                } else {
                                    cbRc = -9;
                                }
                                executorService.submit(new Runnable(){

                                    @Override
                                    public void run() {
                                        cb.processResult(cbRc, null, null);
                                    }
                                });
                            }
                        }, null);
                    }
                });
            }
        };
        AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx) {
                if (0 == rc) {
                    doneFuture.complete(null);
                } else {
                    doneFuture.completeExceptionally(BKException.create(rc));
                }
            }
        };
        lm.asyncProcessLedgers(collector, finalCb, null, 0, -9);
        try {
            doneFuture.get();
            logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal entries = {}", new Object[]{numLedgers.get(), totalBytes.get(), totalEntries.get()});
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted on calculating ledger space : ", (Throwable)e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new IOException("Failed to calculate ledger space : ", e.getCause());
        }
        return totalBytes.get();
    }

    public void close() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static <T> void executeAction(final LinkedBlockingQueue<T> queue, final int numThreads, final Action<T> action) throws IOException {
        final CountDownLatch failureLatch = new CountDownLatch(1);
        final CountDownLatch doneLatch = new CountDownLatch(queue.size());
        final AtomicInteger numFailures = new AtomicInteger(0);
        final AtomicInteger completedThreads = new AtomicInteger(0);
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        try {
            for (int i = 0; i < numThreads; ++i) {
                executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        Object item;
                        while (null != (item = queue.poll())) {
                            try {
                                action.execute(item);
                            }
                            catch (IOException ioe) {
                                logger.error("Failed to execute action on item '{}'", item, (Object)ioe);
                                numFailures.incrementAndGet();
                                failureLatch.countDown();
                                break;
                            }
                            doneLatch.countDown();
                        }
                        if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) {
                            failureLatch.countDown();
                        }
                    }
                });
            }
            try {
                failureLatch.await();
                if (numFailures.get() > 0) {
                    throw new IOException("Encountered " + numFailures.get() + " failures on executing action.");
                }
                doneLatch.await();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                logger.warn("Interrupted on executing action", (Throwable)ie);
                throw new DLInterruptedException("Interrupted on executing action", (Throwable)ie);
            }
        }
        finally {
            executorService.shutdown();
        }
    }

    static interface Action<T> {
        public void execute(T var1) throws IOException;
    }
}

