/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.bookkeeper.replication;

import dlshade.com.google.common.base.Stopwatch;
import dlshade.com.google.common.collect.Sets;
import dlshade.org.apache.bookkeeper.client.BKException;
import dlshade.org.apache.bookkeeper.client.BookKeeper;
import dlshade.org.apache.bookkeeper.client.BookKeeperAdmin;
import dlshade.org.apache.bookkeeper.client.LedgerChecker;
import dlshade.org.apache.bookkeeper.client.LedgerFragment;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.conf.ServerConfiguration;
import dlshade.org.apache.bookkeeper.meta.LedgerManager;
import dlshade.org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import dlshade.org.apache.bookkeeper.net.BookieId;
import dlshade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import dlshade.org.apache.bookkeeper.replication.AuditorBookieCheckTask;
import dlshade.org.apache.bookkeeper.replication.AuditorStats;
import dlshade.org.apache.bookkeeper.replication.AuditorTask;
import dlshade.org.apache.bookkeeper.replication.ReplicationException;
import dlshade.org.apache.zookeeper.AsyncCallback;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditorCheckAllLedgersTask
extends AuditorTask {
    private static final Logger LOG = LoggerFactory.getLogger(AuditorBookieCheckTask.class);
    private final Semaphore openLedgerNoRecoverySemaphore;
    private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec;
    private final ExecutorService ledgerCheckerExecutor;

    AuditorCheckAllLedgersTask(ServerConfiguration conf, AuditorStats auditorStats, BookKeeperAdmin admin, LedgerManager ledgerManager, LedgerUnderreplicationManager ledgerUnderreplicationManager, AuditorTask.ShutdownTaskHandler shutdownTaskHandler, BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) throws ReplicationException.UnavailableException {
        super(conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
        if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
            LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
            throw new ReplicationException.UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
        }
        this.openLedgerNoRecoverySemaphore = new Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
        if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() < 0) {
            LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec should be greater than or equal to 0");
            throw new ReplicationException.UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec should be greater than or equal to 0");
        }
        this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec = conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
        this.ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "AuditorCheckAllLedgers-LedgerChecker");
                t.setDaemon(true);
                return t;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void runTask() {
        if (this.hasBookieCheckTask()) {
            LOG.info("Audit bookie task already scheduled; skipping periodic all ledgers check task");
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        boolean checkSuccess = false;
        try {
            if (!this.isLedgerReplicationEnabled()) {
                LOG.info("Ledger replication disabled, skipping checkAllLedgers");
                checkSuccess = true;
                return;
            }
            LOG.info("Starting checkAllLedgers");
            this.checkAllLedgers();
            long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
            LOG.info("Completed checkAllLedgers in {} milliSeconds", (Object)checkAllLedgersDuration);
            this.auditorStats.getCheckAllLedgersTime().registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
            checkSuccess = true;
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.error("Interrupted while running periodic check", (Throwable)ie);
        }
        catch (BKException bke) {
            LOG.error("Exception running periodic check", (Throwable)bke);
        }
        catch (IOException ioe) {
            LOG.error("I/O exception running periodic check", (Throwable)ioe);
        }
        catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
            this.submitShutdownTask();
        }
        catch (ReplicationException.UnavailableException ue) {
            LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
        }
        finally {
            if (!checkSuccess) {
                long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                this.auditorStats.getCheckAllLedgersTime().registerFailedEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override
    public void shutdown() {
        LOG.info("Shutting down AuditorCheckAllLedgersTask");
        this.ledgerCheckerExecutor.shutdown();
        try {
            while (!this.ledgerCheckerExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOG.warn("Executor for ledger checker not shutting down, interrupting");
                this.ledgerCheckerExecutor.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.warn("Interrupted while shutting down AuditorCheckAllLedgersTask", (Throwable)ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkAllLedgers() throws BKException, IOException, InterruptedException {
        BookKeeper localClient = this.getBookKeeper(this.conf);
        BookKeeperAdmin localAdmin = this.getBookKeeperAdmin(localClient);
        try {
            LedgerChecker checker = new LedgerChecker(localClient, this.conf.getInFlightReadEntryNumInLedgerChecker());
            CompletableFuture processFuture = new CompletableFuture();
            BookkeeperInternalCallbacks.Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
                try {
                    if (!this.ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                        LOG.info("Ledger rereplication has been disabled, aborting periodic check");
                        FutureUtils.complete(processFuture, null);
                        return;
                    }
                }
                catch (ReplicationException.NonRecoverableReplicationException nre) {
                    LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                    this.submitShutdownTask();
                    return;
                }
                catch (ReplicationException.UnavailableException ue) {
                    LOG.error("Underreplication manager unavailable running periodic check", (Throwable)ue);
                    FutureUtils.complete(processFuture, null);
                    return;
                }
                try {
                    if (!this.openLedgerNoRecoverySemaphore.tryAcquire(this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec, TimeUnit.MILLISECONDS)) {
                        LOG.warn("Failed to acquire semaphore for {} ms, ledgerId: {}", (Object)this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec, ledgerId);
                        FutureUtils.complete(processFuture, null);
                        return;
                    }
                }
                catch (InterruptedException e) {
                    LOG.error("Unable to acquire open ledger operation semaphore ", (Throwable)e);
                    Thread.currentThread().interrupt();
                    FutureUtils.complete(processFuture, null);
                    return;
                }
                localAdmin.asyncOpenLedgerNoRecovery((long)ledgerId, (rc, lh, ctx) -> {
                    this.openLedgerNoRecoverySemaphore.release();
                    if (0 == rc) {
                        this.ledgerCheckerExecutor.execute(() -> {
                            checker.checkLedger(lh, new ProcessLostFragmentsCb(lh, callback), this.conf.getAuditorLedgerVerificationPercentage());
                            this.auditorStats.getNumFragmentsPerLedger().registerSuccessfulValue(lh.getNumFragments());
                            this.auditorStats.getNumBookiesPerLedger().registerSuccessfulValue(lh.getNumBookies());
                            this.auditorStats.getNumLedgersChecked().inc();
                        });
                    } else if (-25 == rc) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
                        }
                        callback.processResult(0, null, null);
                    } else {
                        LOG.error("Couldn't open ledger {} to check : {}", ledgerId, (Object)BKException.getMessage(rc));
                        callback.processResult(rc, null, null);
                    }
                }, null);
            };
            this.ledgerManager.asyncProcessLedgers(checkLedgersProcessor, (rc, path, ctx) -> {
                if (0 == rc) {
                    FutureUtils.complete(processFuture, null);
                } else {
                    FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
                }
            }, null, 0, -1);
            FutureUtils.result(processFuture, BKException.HANDLER);
            try {
                this.ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
            }
            catch (ReplicationException.NonRecoverableReplicationException nre) {
                LOG.error("Non Recoverable Exception while reading from ZK", (Throwable)nre);
                this.submitShutdownTask();
            }
            catch (ReplicationException.UnavailableException ue) {
                LOG.error("Got exception while trying to set checkAllLedgersCTime", (Throwable)ue);
            }
        }
        finally {
            localAdmin.close();
            localClient.close();
        }
    }

    private class ProcessLostFragmentsCb
    implements BookkeeperInternalCallbacks.GenericCallback<Set<LedgerFragment>> {
        final LedgerHandle lh;
        final AsyncCallback.VoidCallback callback;

        ProcessLostFragmentsCb(LedgerHandle lh, AsyncCallback.VoidCallback callback) {
            this.lh = lh;
            this.callback = callback;
        }

        @Override
        public void operationComplete(int rc, Set<LedgerFragment> fragments) {
            if (rc == 0) {
                HashSet<BookieId> bookies = Sets.newHashSet();
                for (LedgerFragment f : fragments) {
                    bookies.addAll(f.getAddresses());
                }
                if (bookies.isEmpty()) {
                    this.callback.processResult(0, null, null);
                } else {
                    AuditorCheckAllLedgersTask.this.publishSuspectedLedgersAsync(bookies.stream().map(BookieId::toString).collect(Collectors.toList()), Sets.newHashSet(this.lh.getId())).whenComplete((result, cause) -> {
                        if (null != cause) {
                            LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}", new Object[]{this.lh.getId(), bookies, cause});
                            this.callback.processResult(-200, null, null);
                        } else {
                            this.callback.processResult(0, null, null);
                        }
                    });
                }
            } else {
                this.callback.processResult(rc, null, null);
            }
            this.lh.closeAsync().whenComplete((result, cause) -> {
                if (null != cause) {
                    LOG.warn("Error closing ledger {} : {}", (Object)this.lh.getId(), (Object)cause.getMessage());
                }
            });
        }
    }
}

