/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.bk;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.util.ZkUtils;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.bk.ImmutableQuorumConfigProvider;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.bk.QuorumConfigProvider;
import org.apache.distributedlog.bk.SimpleLedgerAllocator;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LedgerAllocatorPool
implements LedgerAllocator {
    private static final Logger logger = LoggerFactory.getLogger(LedgerAllocatorPool.class);
    private final DistributedLogConfiguration conf;
    private final QuorumConfigProvider quorumConfigProvider;
    private final BookKeeperClient bkc;
    private final ZooKeeperClient zkc;
    private final ScheduledExecutorService scheduledExecutorService;
    private final String poolPath;
    private final int corePoolSize;
    private final LinkedList<SimpleLedgerAllocator> pendingList = new LinkedList();
    private final LinkedList<SimpleLedgerAllocator> allocatingList = new LinkedList();
    private final Map<String, SimpleLedgerAllocator> rescueMap = new HashMap<String, SimpleLedgerAllocator>();
    private final Map<LedgerHandle, SimpleLedgerAllocator> obtainMap = new HashMap<LedgerHandle, SimpleLedgerAllocator>();
    private final Map<SimpleLedgerAllocator, LedgerHandle> reverseObtainMap = new HashMap<SimpleLedgerAllocator, LedgerHandle>();

    public LedgerAllocatorPool(String poolPath, int corePoolSize, DistributedLogConfiguration conf, ZooKeeperClient zkc, BookKeeperClient bkc, ScheduledExecutorService scheduledExecutorService) throws IOException {
        this.poolPath = poolPath;
        this.corePoolSize = corePoolSize;
        this.conf = conf;
        this.quorumConfigProvider = new ImmutableQuorumConfigProvider(conf.getQuorumConfig());
        this.zkc = zkc;
        this.bkc = bkc;
        this.scheduledExecutorService = scheduledExecutorService;
        this.initializePool();
    }

    @Override
    public void start() throws IOException {
        for (LedgerAllocator ledgerAllocator : this.pendingList) {
            ledgerAllocator.allocate();
        }
    }

    @VisibleForTesting
    synchronized int pendingListSize() {
        return this.pendingList.size();
    }

    @VisibleForTesting
    synchronized int allocatingListSize() {
        return this.allocatingList.size();
    }

    @VisibleForTesting
    public synchronized int obtainMapSize() {
        return this.obtainMap.size();
    }

    @VisibleForTesting
    synchronized int rescueSize() {
        return this.rescueMap.size();
    }

    @VisibleForTesting
    synchronized SimpleLedgerAllocator getLedgerAllocator(LedgerHandle lh) {
        return this.obtainMap.get(lh);
    }

    private void initializePool() throws IOException {
        try {
            List<String> allocators;
            try {
                allocators = this.zkc.get().getChildren(this.poolPath, false);
            }
            catch (KeeperException.NoNodeException e) {
                logger.info("Allocator Pool {} doesn't exist. Creating it.", (Object)this.poolPath);
                ZkUtils.createFullPathOptimistic(this.zkc.get(), this.poolPath, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT);
                allocators = this.zkc.get().getChildren(this.poolPath, false);
            }
            if (null == allocators) {
                allocators = new ArrayList<String>();
            }
            if (allocators.size() < this.corePoolSize) {
                this.createAllocators(this.corePoolSize - allocators.size());
                allocators = this.zkc.get().getChildren(this.poolPath, false);
            }
            this.initializeAllocators(allocators);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted when ensuring " + this.poolPath + " created : ", (Throwable)ie);
        }
        catch (KeeperException ke) {
            throw new IOException("Encountered zookeeper exception when initializing pool " + this.poolPath + " : ", ke);
        }
    }

    private void createAllocators(int numAllocators) throws InterruptedException, IOException {
        final AtomicInteger numPendings = new AtomicInteger(numAllocators);
        final AtomicInteger numFailures = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(1);
        AsyncCallback.StringCallback createCallback = new AsyncCallback.StringCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                if (KeeperException.Code.OK.intValue() != rc) {
                    numFailures.incrementAndGet();
                    latch.countDown();
                    return;
                }
                if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) {
                    latch.countDown();
                }
            }
        };
        for (int i = 0; i < numAllocators; ++i) {
            this.zkc.get().create(this.poolPath + "/A", new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL, createCallback, null);
        }
        latch.await();
        if (numFailures.get() > 0) {
            throw new IOException("Failed to create " + numAllocators + " allocators.");
        }
    }

    private void initializeAllocators(List<String> allocators) throws IOException, InterruptedException {
        final AtomicInteger numPendings = new AtomicInteger(allocators.size());
        final AtomicInteger numFailures = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(numPendings.get() > 0 ? 1 : 0);
        AsyncCallback.DataCallback dataCallback = new AsyncCallback.DataCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                if (KeeperException.Code.OK.intValue() != rc) {
                    numFailures.incrementAndGet();
                    latch.countDown();
                    return;
                }
                Versioned<byte[]> allocatorData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
                SimpleLedgerAllocator allocator = new SimpleLedgerAllocator(path, allocatorData, LedgerAllocatorPool.this.quorumConfigProvider, LedgerAllocatorPool.this.zkc, LedgerAllocatorPool.this.bkc);
                allocator.start();
                LedgerAllocatorPool.this.pendingList.add(allocator);
                if (numPendings.decrementAndGet() == 0 && numFailures.get() == 0) {
                    latch.countDown();
                }
            }
        };
        for (String name : allocators) {
            String path = this.poolPath + "/" + name;
            this.zkc.get().getData(path, false, dataCallback, null);
        }
        latch.await();
        if (numFailures.get() > 0) {
            throw new IOException("Failed to initialize allocators : " + allocators);
        }
    }

    private void scheduleAllocatorRescue(final SimpleLedgerAllocator ledgerAllocator) {
        try {
            this.scheduledExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        LedgerAllocatorPool.this.rescueAllocator(ledgerAllocator);
                    }
                    catch (DLInterruptedException dle) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, (long)this.conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException ree) {
            logger.warn("Failed to schedule rescuing ledger allocator {} : ", (Object)ledgerAllocator.allocatePath, (Object)ree);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rescueAllocator(final SimpleLedgerAllocator ledgerAllocator) throws DLInterruptedException {
        SimpleLedgerAllocator oldAllocator;
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            oldAllocator = this.rescueMap.put(ledgerAllocator.allocatePath, ledgerAllocator);
        }
        if (oldAllocator != null) {
            logger.info("ledger allocator {} is being rescued.", (Object)ledgerAllocator.allocatePath);
            return;
        }
        try {
            this.zkc.get().getData(ledgerAllocator.allocatePath, false, new AsyncCallback.DataCallback(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                    boolean retry = false;
                    SimpleLedgerAllocator newAllocator = null;
                    if (KeeperException.Code.OK.intValue() == rc) {
                        Versioned<byte[]> allocatorData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
                        logger.info("Rescuing ledger allocator {}.", (Object)path);
                        newAllocator = new SimpleLedgerAllocator(path, allocatorData, LedgerAllocatorPool.this.quorumConfigProvider, LedgerAllocatorPool.this.zkc, LedgerAllocatorPool.this.bkc);
                        newAllocator.start();
                        logger.info("Rescued ledger allocator {}.", (Object)path);
                    } else if (KeeperException.Code.NONODE.intValue() == rc) {
                        logger.info("Ledger allocator {} doesn't exist, skip rescuing it.", (Object)path);
                    } else {
                        retry = true;
                    }
                    LedgerAllocatorPool ledgerAllocatorPool = LedgerAllocatorPool.this;
                    synchronized (ledgerAllocatorPool) {
                        LedgerAllocatorPool.this.rescueMap.remove(ledgerAllocator.allocatePath);
                        if (null != newAllocator) {
                            LedgerAllocatorPool.this.pendingList.addLast(newAllocator);
                        }
                    }
                    if (retry) {
                        LedgerAllocatorPool.this.scheduleAllocatorRescue(ledgerAllocator);
                    }
                }
            }, null);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on rescuing ledger allocator {} : ", (Object)ledgerAllocator.allocatePath, (Object)ie);
            LedgerAllocatorPool ledgerAllocatorPool2 = this;
            synchronized (ledgerAllocatorPool2) {
                this.rescueMap.remove(ledgerAllocator.allocatePath);
            }
            throw new DLInterruptedException("Interrupted on rescuing ledger allocator " + ledgerAllocator.allocatePath, (Throwable)ie);
        }
        catch (IOException ioe) {
            logger.warn("Failed to rescue ledger allocator {}, retry rescuing it later : ", (Object)ledgerAllocator.allocatePath, (Object)ioe);
            LedgerAllocatorPool ledgerAllocatorPool3 = this;
            synchronized (ledgerAllocatorPool3) {
                this.rescueMap.remove(ledgerAllocator.allocatePath);
            }
            this.scheduleAllocatorRescue(ledgerAllocator);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void allocate() throws IOException {
        SimpleLedgerAllocator allocator;
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            if (this.pendingList.isEmpty()) {
                throw new IOException("No ledger allocator available under " + this.poolPath + ".");
            }
            allocator = this.pendingList.removeFirst();
        }
        boolean success = false;
        try {
            allocator.allocate();
            LedgerAllocatorPool ledgerAllocatorPool2 = this;
            synchronized (ledgerAllocatorPool2) {
                this.allocatingList.addLast(allocator);
            }
            success = true;
        }
        finally {
            if (!success) {
                this.rescueAllocator(allocator);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> txn, final Transaction.OpListener<LedgerHandle> listener) {
        SimpleLedgerAllocator allocator;
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            if (this.allocatingList.isEmpty()) {
                return FutureUtils.exception(new IOException("No ledger allocator available under " + this.poolPath + "."));
            }
            allocator = this.allocatingList.removeFirst();
        }
        final CompletableFuture<LedgerHandle> tryObtainPromise = new CompletableFuture<LedgerHandle>();
        FutureEventListener<LedgerHandle> tryObtainListener = new FutureEventListener<LedgerHandle>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onSuccess(LedgerHandle lh) {
                LedgerAllocatorPool ledgerAllocatorPool = LedgerAllocatorPool.this;
                synchronized (ledgerAllocatorPool) {
                    LedgerAllocatorPool.this.obtainMap.put(lh, allocator);
                    LedgerAllocatorPool.this.reverseObtainMap.put(allocator, lh);
                    tryObtainPromise.complete(lh);
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                try {
                    LedgerAllocatorPool.this.rescueAllocator(allocator);
                }
                catch (IOException ioe) {
                    logger.info("Failed to rescue allocator {}", (Object)allocator.allocatePath, (Object)ioe);
                }
                tryObtainPromise.completeExceptionally(cause);
            }
        };
        allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>(){

            @Override
            public void onCommit(LedgerHandle lh) {
                LedgerAllocatorPool.this.confirmObtain(allocator);
                listener.onCommit(lh);
            }

            @Override
            public void onAbort(Throwable t) {
                LedgerAllocatorPool.this.abortObtain(allocator);
                listener.onAbort(t);
            }
        }).whenComplete(tryObtainListener);
        return tryObtainPromise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void confirmObtain(SimpleLedgerAllocator allocator) {
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            LedgerHandle lh = this.reverseObtainMap.remove(allocator);
            if (null != lh) {
                this.obtainMap.remove(lh);
            }
        }
        ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            this.pendingList.addLast(allocator);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void abortObtain(SimpleLedgerAllocator allocator) {
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            LedgerHandle lh = this.reverseObtainMap.remove(allocator);
            if (null != lh) {
                this.obtainMap.remove(lh);
            }
        }
        try {
            this.rescueAllocator(allocator);
        }
        catch (DLInterruptedException e) {
            logger.warn("Interrupted on rescuing ledger allocator pool {} : ", (Object)this.poolPath, (Object)e);
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> asyncClose() {
        ArrayList<SimpleLedgerAllocator> allocatorsToClose;
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            allocatorsToClose = Lists.newArrayListWithExpectedSize(this.pendingList.size() + this.allocatingList.size() + this.obtainMap.size());
            allocatorsToClose.addAll(this.pendingList);
            allocatorsToClose.addAll(this.allocatingList);
            allocatorsToClose.addAll(this.obtainMap.values());
        }
        return FutureUtils.processList(allocatorsToClose, allocator -> allocator.asyncClose(), this.scheduledExecutorService).thenApply(values -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> delete() {
        ArrayList<SimpleLedgerAllocator> allocatorsToDelete;
        LedgerAllocatorPool ledgerAllocatorPool = this;
        synchronized (ledgerAllocatorPool) {
            allocatorsToDelete = Lists.newArrayListWithExpectedSize(this.pendingList.size() + this.allocatingList.size() + this.obtainMap.size());
            allocatorsToDelete.addAll(this.pendingList);
            allocatorsToDelete.addAll(this.allocatingList);
            allocatorsToDelete.addAll(this.obtainMap.values());
        }
        return FutureUtils.processList(allocatorsToDelete, allocator -> allocator.delete(), this.scheduledExecutorService).thenCompose(values -> Utils.zkDelete(this.zkc, this.poolPath, new LongVersion(-1L)));
    }
}

