/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.cache.Cache;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.cache.CacheBuilder;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.cache.RemovalListener;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.cache.RemovalNotification;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookieWatcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.MathUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.Counter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsDoc(name="bookie_watcher", help="Bookie watcher related stats")
class BookieWatcherImpl
implements BookieWatcher {
    private static final Logger log = LoggerFactory.getLogger(BookieWatcherImpl.class);
    private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
        if (cause instanceof BKException) {
            log.error("Failed to get bookie list : ", cause);
            return (BKException)cause;
        }
        if (cause instanceof InterruptedException) {
            log.error("Interrupted reading bookie list : ", cause);
            return new BKException.BKInterruptedException();
        }
        BKException.MetaStoreException mse = new BKException.MetaStoreException((Throwable)cause);
        return mse;
    };
    private final ClientConfiguration conf;
    private final RegistrationClient registrationClient;
    private final EnsemblePlacementPolicy placementPolicy;
    @StatsDoc(name="NEW_ENSEMBLE_TIME", help="operation stats of new ensembles", parent="LEDGER_CREATE")
    private final OpStatsLogger newEnsembleTimer;
    @StatsDoc(name="REPLACE_BOOKIE_TIME", help="operation stats of replacing bookie in an ensemble")
    private final OpStatsLogger replaceBookieTimer;
    @StatsDoc(name="ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER", help="total number of newEnsemble/replaceBookie operations failed to adhere EnsemblePlacementPolicy")
    private final Counter ensembleNotAdheringToPlacementPolicy;
    final Cache<BookieId, Boolean> quarantinedBookies;
    private volatile Set<BookieId> writableBookies = Collections.emptySet();
    private volatile Set<BookieId> readOnlyBookies = Collections.emptySet();
    private CompletableFuture<?> initialWritableBookiesFuture = null;
    private CompletableFuture<?> initialReadonlyBookiesFuture = null;
    private final BookieAddressResolver bookieAddressResolver;

    public BookieWatcherImpl(ClientConfiguration conf, EnsemblePlacementPolicy placementPolicy, RegistrationClient registrationClient, BookieAddressResolver bookieAddressResolver, StatsLogger statsLogger) {
        this.conf = conf;
        this.bookieAddressResolver = bookieAddressResolver;
        this.placementPolicy = placementPolicy;
        this.registrationClient = registrationClient;
        this.quarantinedBookies = CacheBuilder.newBuilder().expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS).removalListener(new RemovalListener<BookieId, Boolean>(){

            @Override
            public void onRemoval(RemovalNotification<BookieId, Boolean> bookie) {
                log.info("Bookie {} is no longer quarantined", bookie.getKey());
            }
        }).build();
        this.newEnsembleTimer = statsLogger.getOpStatsLogger("NEW_ENSEMBLE_TIME");
        this.replaceBookieTimer = statsLogger.getOpStatsLogger("REPLACE_BOOKIE_TIME");
        this.ensembleNotAdheringToPlacementPolicy = statsLogger.getCounter("ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER");
    }

    @Override
    public Set<BookieId> getBookies() throws BKException {
        try {
            return FutureUtils.result(this.registrationClient.getWritableBookies(), EXCEPTION_FUNC).getValue();
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
    }

    @Override
    public Set<BookieId> getAllBookies() throws BKException {
        try {
            return FutureUtils.result(this.registrationClient.getAllBookies(), EXCEPTION_FUNC).getValue();
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
    }

    @Override
    public BookieAddressResolver getBookieAddressResolver() {
        return this.bookieAddressResolver;
    }

    @Override
    public Set<BookieId> getReadOnlyBookies() throws BKException {
        try {
            return FutureUtils.result(this.registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
    }

    @Override
    public boolean isBookieUnavailable(BookieId id) {
        return !this.readOnlyBookies.contains(id) && !this.writableBookies.contains(id);
    }

    private synchronized void processWritableBookiesChanged(Set<BookieId> newBookieAddrs) {
        this.writableBookies = newBookieAddrs;
        this.placementPolicy.onClusterChanged(newBookieAddrs, this.readOnlyBookies);
    }

    private synchronized void processReadOnlyBookiesChanged(Set<BookieId> readOnlyBookies) {
        this.readOnlyBookies = readOnlyBookies;
        this.placementPolicy.onClusterChanged(this.writableBookies, readOnlyBookies);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialBlockingBookieRead() throws BKException {
        CompletableFuture<Object> readonly;
        CompletableFuture<Object> writable;
        BookieWatcherImpl bookieWatcherImpl = this;
        synchronized (bookieWatcherImpl) {
            if (this.initialReadonlyBookiesFuture == null) {
                assert (this.initialWritableBookiesFuture == null);
                writable = this.registrationClient.watchWritableBookies(bookies -> this.processWritableBookiesChanged((Set)bookies.getValue()));
                readonly = this.registrationClient.watchReadOnlyBookies(bookies -> this.processReadOnlyBookiesChanged((Set)bookies.getValue()));
                this.initialWritableBookiesFuture = writable;
                this.initialReadonlyBookiesFuture = readonly;
            } else {
                writable = this.initialWritableBookiesFuture;
                readonly = this.initialReadonlyBookiesFuture;
            }
        }
        try {
            FutureUtils.result(writable, EXCEPTION_FUNC);
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
        try {
            FutureUtils.result(readonly, EXCEPTION_FUNC);
        }
        catch (BKException.BKInterruptedException ie) {
            Thread.currentThread().interrupt();
            throw ie;
        }
        catch (Exception e) {
            log.error("Failed getReadOnlyBookies: ", (Throwable)e);
        }
    }

    @Override
    public List<BookieId> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata) throws BKException.BKNotEnoughBookiesException {
        List<BookieId> socketAddresses;
        long startTime = MathUtils.nowInNano();
        try {
            Set quarantinedBookiesSet = this.quarantinedBookies.asMap().keySet();
            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> newEnsembleResponse = this.placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieId>(quarantinedBookiesSet));
            socketAddresses = newEnsembleResponse.getResult();
            EnsemblePlacementPolicy.PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
            if (isEnsembleAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
                this.ensembleNotAdheringToPlacementPolicy.inc();
                if (ensembleSize > 1) {
                    log.warn("New ensemble: {} is not adhering to Placement Policy. quarantinedBookies: {}", socketAddresses, quarantinedBookiesSet);
                }
            }
            this.newEnsembleTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        catch (BKException.BKNotEnoughBookiesException e) {
            if (log.isDebugEnabled()) {
                log.debug("Not enough healthy bookies available, using quarantined bookies");
            }
            EnsemblePlacementPolicy.PlacementResult<List<BookieId>> newEnsembleResponse = this.placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieId>());
            socketAddresses = newEnsembleResponse.getResult();
            EnsemblePlacementPolicy.PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = newEnsembleResponse.isAdheringToPolicy();
            if (isEnsembleAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
                this.ensembleNotAdheringToPlacementPolicy.inc();
                log.warn("New ensemble: {} is not adhering to Placement Policy", socketAddresses);
            }
            this.newEnsembleTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        return socketAddresses;
    }

    @Override
    public BookieId replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata, List<BookieId> existingBookies, int bookieIdx, Set<BookieId> excludeBookies) throws BKException.BKNotEnoughBookiesException {
        BookieId socketAddress;
        long startTime = MathUtils.nowInNano();
        BookieId addr = existingBookies.get(bookieIdx);
        EnsemblePlacementPolicy.PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL;
        try {
            HashSet<BookieId> excludedBookiesAndQuarantinedBookies = new HashSet<BookieId>(excludeBookies);
            Set quarantinedBookiesSet = this.quarantinedBookies.asMap().keySet();
            excludedBookiesAndQuarantinedBookies.addAll(quarantinedBookiesSet);
            EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse = this.placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, existingBookies, addr, excludedBookiesAndQuarantinedBookies);
            socketAddress = replaceBookieResponse.getResult();
            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
            if (isEnsembleAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
                this.ensembleNotAdheringToPlacementPolicy.inc();
                log.warn("replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and chose {}. excludedBookies {} and quarantinedBookies {}", new Object[]{addr, existingBookies, socketAddress, excludeBookies, quarantinedBookiesSet});
            }
            this.replaceBookieTimer.registerSuccessfulEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        catch (BKException.BKNotEnoughBookiesException e) {
            if (log.isDebugEnabled()) {
                log.debug("Not enough healthy bookies available, using quarantined bookies");
            }
            EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse = this.placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, existingBookies, addr, excludeBookies);
            socketAddress = replaceBookieResponse.getResult();
            isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
            if (isEnsembleAdheringToPlacementPolicy == EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL) {
                this.ensembleNotAdheringToPlacementPolicy.inc();
                log.warn("replaceBookie for bookie: {} in ensemble: {} is not adhering to placement policy and chose {}. excludedBookies {}", new Object[]{addr, existingBookies, socketAddress, excludeBookies});
            }
            this.replaceBookieTimer.registerFailedEvent(MathUtils.nowInNano() - startTime, TimeUnit.NANOSECONDS);
        }
        return socketAddress;
    }

    @Override
    public void quarantineBookie(BookieId bookie) {
        if (this.quarantinedBookies.getIfPresent(bookie) == null) {
            this.quarantinedBookies.put(bookie, Boolean.TRUE);
            log.warn("Bookie {} has been quarantined because of read/write errors.", (Object)bookie);
        }
    }
}

