/*
 * Decompiled with CFR 0.152.
 */
package dlshade.org.apache.distributedlog.lock;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.base.Stopwatch;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.common.util.OrderedScheduler;
import dlshade.org.apache.bookkeeper.stats.Counter;
import dlshade.org.apache.bookkeeper.stats.OpStatsLogger;
import dlshade.org.apache.bookkeeper.stats.StatsLogger;
import dlshade.org.apache.distributedlog.common.concurrent.AsyncSemaphore;
import dlshade.org.apache.distributedlog.exceptions.LockingException;
import dlshade.org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import dlshade.org.apache.distributedlog.exceptions.UnexpectedException;
import dlshade.org.apache.distributedlog.lock.DistributedLock;
import dlshade.org.apache.distributedlog.lock.DistributedLockContext;
import dlshade.org.apache.distributedlog.lock.LockClosedException;
import dlshade.org.apache.distributedlog.lock.LockListener;
import dlshade.org.apache.distributedlog.lock.LockWaiter;
import dlshade.org.apache.distributedlog.lock.SessionLock;
import dlshade.org.apache.distributedlog.lock.SessionLockFactory;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKDistributedLock
implements LockListener,
DistributedLock {
    static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class);
    private final SessionLockFactory lockFactory;
    private final OrderedScheduler lockStateExecutor;
    private final String lockPath;
    private final long lockTimeout;
    private final DistributedLockContext lockContext = new DistributedLockContext();
    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1, Optional.empty());
    private CompletableFuture<ZKDistributedLock> lockAcquireFuture = null;
    private CompletableFuture<ZKDistributedLock> lockReacquireFuture = null;
    private SessionLock internalLock = null;
    private CompletableFuture<LockWaiter> tryLockFuture = null;
    private LockWaiter lockWaiter = null;
    private LockingException lockReacquireException = null;
    private volatile boolean closed = false;
    private CompletableFuture<Void> closeFuture = null;
    private static final AtomicIntegerFieldUpdater<ZKDistributedLock> reacquireCountUpdater = AtomicIntegerFieldUpdater.newUpdater(ZKDistributedLock.class, "reacquireCount");
    private volatile int reacquireCount = 0;
    private final StatsLogger lockStatsLogger;
    private final OpStatsLogger acquireStats;
    private final OpStatsLogger reacquireStats;
    private final Counter internalTryRetries;

    public ZKDistributedLock(OrderedScheduler lockStateExecutor, SessionLockFactory lockFactory, String lockPath, long lockTimeout, StatsLogger statsLogger) {
        this.lockStateExecutor = lockStateExecutor;
        this.lockPath = lockPath;
        this.lockTimeout = lockTimeout;
        this.lockFactory = lockFactory;
        this.lockStatsLogger = statsLogger.scope("lock");
        this.acquireStats = this.lockStatsLogger.getOpStatsLogger("acquire");
        this.reacquireStats = this.lockStatsLogger.getOpStatsLogger("reacquire");
        this.internalTryRetries = this.lockStatsLogger.getCounter("internalTryRetries");
    }

    private LockClosedException newLockClosedException() {
        return new LockClosedException(this.lockPath, "Lock is already closed");
    }

    private synchronized void checkLockState() throws LockingException {
        if (this.closed) {
            throw this.newLockClosedException();
        }
        if (null != this.lockReacquireException) {
            throw this.lockReacquireException;
        }
    }

    public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
        if (null != this.lockAcquireFuture) {
            return FutureUtils.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + this.lockPath));
        }
        CompletableFuture<ZKDistributedLock> promise = FutureUtils.createFuture();
        promise.whenComplete((zkDistributedLock, throwable) -> {
            if (null == throwable || !(throwable instanceof CancellationException)) {
                return;
            }
            this.lockStateExecutor.executeOrdered(this.lockPath, () -> this.asyncClose());
        });
        final Stopwatch stopwatch = Stopwatch.createStarted();
        promise.whenComplete(new FutureEventListener<ZKDistributedLock>(){

            @Override
            public void onSuccess(ZKDistributedLock lock) {
                ZKDistributedLock.this.acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
            }

            @Override
            public void onFailure(Throwable cause) {
                ZKDistributedLock.this.acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                ZKDistributedLock.this.asyncClose();
            }
        });
        this.lockAcquireFuture = promise;
        this.lockStateExecutor.executeOrdered(this.lockPath, () -> this.doAsyncAcquireWithSemaphore(promise, this.lockTimeout));
        return promise;
    }

    void doAsyncAcquireWithSemaphore(CompletableFuture<ZKDistributedLock> acquirePromise, long lockTimeout) {
        this.lockSemaphore.acquireAndRun(() -> {
            this.doAsyncAcquire(acquirePromise, lockTimeout);
            return acquirePromise;
        });
    }

    void doAsyncAcquire(final CompletableFuture<ZKDistributedLock> acquirePromise, final long lockTimeout) {
        LOG.trace("Async Lock Acquire {}", (Object)this.lockPath);
        try {
            this.checkLockState();
        }
        catch (IOException ioe) {
            FutureUtils.completeExceptionally(acquirePromise, ioe);
            return;
        }
        if (this.haveLock()) {
            FutureUtils.complete(acquirePromise, this);
            return;
        }
        this.lockFactory.createLock(this.lockPath, this.lockContext).whenCompleteAsync(new FutureEventListener<SessionLock>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onSuccess(SessionLock lock) {
                ZKDistributedLock zKDistributedLock = ZKDistributedLock.this;
                synchronized (zKDistributedLock) {
                    if (ZKDistributedLock.this.closed) {
                        LOG.info("Skipping tryLocking lock {} since it is already closed", (Object)ZKDistributedLock.this.lockPath);
                        FutureUtils.completeExceptionally(acquirePromise, ZKDistributedLock.this.newLockClosedException());
                        return;
                    }
                }
                zKDistributedLock = ZKDistributedLock.this;
                synchronized (zKDistributedLock) {
                    ZKDistributedLock.this.internalLock = lock;
                    ZKDistributedLock.this.internalLock.setLockListener(ZKDistributedLock.this);
                }
                ZKDistributedLock.this.asyncTryLock(lock, acquirePromise, lockTimeout);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(acquirePromise, cause);
            }
        }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
    }

    void asyncTryLock(SessionLock lock, final CompletableFuture<ZKDistributedLock> acquirePromise, long lockTimeout) {
        if (null != this.tryLockFuture) {
            this.tryLockFuture.cancel(true);
        }
        this.tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS);
        this.tryLockFuture.whenCompleteAsync(new FutureEventListener<LockWaiter>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onSuccess(LockWaiter waiter) {
                ZKDistributedLock zKDistributedLock = ZKDistributedLock.this;
                synchronized (zKDistributedLock) {
                    if (ZKDistributedLock.this.closed) {
                        LOG.info("Skipping acquiring lock {} since it is already closed", (Object)ZKDistributedLock.this.lockPath);
                        waiter.getAcquireFuture().completeExceptionally(new LockingException(ZKDistributedLock.this.lockPath, "lock is already closed."));
                        FutureUtils.completeExceptionally(acquirePromise, ZKDistributedLock.this.newLockClosedException());
                        return;
                    }
                }
                ZKDistributedLock.this.tryLockFuture = null;
                ZKDistributedLock.this.lockWaiter = waiter;
                ZKDistributedLock.this.waitForAcquire(waiter, acquirePromise);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(acquirePromise, cause);
            }
        }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
    }

    void waitForAcquire(final LockWaiter waiter, final CompletableFuture<ZKDistributedLock> acquirePromise) {
        waiter.getAcquireFuture().whenCompleteAsync(new FutureEventListener<Boolean>(){

            @Override
            public void onSuccess(Boolean acquired) {
                LOG.info("{} acquired lock {}", (Object)waiter, (Object)ZKDistributedLock.this.lockPath);
                if (acquired.booleanValue()) {
                    FutureUtils.complete(acquirePromise, ZKDistributedLock.this);
                } else {
                    FutureUtils.completeExceptionally(acquirePromise, new OwnershipAcquireFailedException(ZKDistributedLock.this.lockPath, waiter.getCurrentOwner()));
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(acquirePromise, cause);
            }
        }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
    }

    @Override
    public void onExpired() {
        try {
            this.reacquireLock(false);
        }
        catch (LockingException le) {
            LOG.error("Locking exception on re-acquiring lock {} : ", (Object)this.lockPath, (Object)le);
        }
    }

    @Override
    public synchronized void checkOwnershipAndReacquire() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDone()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (this.haveLock()) {
            return;
        }
        this.reacquireLock(true);
    }

    @Override
    public synchronized void checkOwnership() throws LockingException {
        if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDone()) {
            throw new LockingException(this.lockPath, "check ownership before acquiring");
        }
        if (!this.haveLock()) {
            throw new LockingException(this.lockPath, "Lost lock ownership");
        }
    }

    @VisibleForTesting
    int getReacquireCount() {
        return reacquireCountUpdater.get(this);
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockReacquireFuture() {
        return this.lockReacquireFuture;
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockAcquireFuture() {
        return this.lockAcquireFuture;
    }

    @VisibleForTesting
    synchronized SessionLock getInternalLock() {
        return this.internalLock;
    }

    @VisibleForTesting
    LockWaiter getLockWaiter() {
        return this.lockWaiter;
    }

    synchronized boolean haveLock() {
        return !this.closed && this.internalLock != null && this.internalLock.isLockHeld();
    }

    void closeWaiter(LockWaiter waiter, final CompletableFuture<Void> closePromise) {
        if (null == waiter) {
            this.interruptTryLock(this.tryLockFuture, closePromise);
        } else {
            waiter.getAcquireFuture().whenCompleteAsync(new FutureEventListener<Boolean>(){

                @Override
                public void onSuccess(Boolean value) {
                    ZKDistributedLock.this.unlockInternalLock(closePromise);
                }

                @Override
                public void onFailure(Throwable cause) {
                    ZKDistributedLock.this.unlockInternalLock(closePromise);
                }
            }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
            waiter.getAcquireFuture().cancel(true);
        }
    }

    void interruptTryLock(CompletableFuture<LockWaiter> tryLockFuture, final CompletableFuture<Void> closePromise) {
        if (null == tryLockFuture) {
            this.unlockInternalLock(closePromise);
        } else {
            tryLockFuture.whenCompleteAsync(new FutureEventListener<LockWaiter>(){

                @Override
                public void onSuccess(LockWaiter waiter) {
                    ZKDistributedLock.this.closeWaiter(waiter, closePromise);
                }

                @Override
                public void onFailure(Throwable cause) {
                    ZKDistributedLock.this.unlockInternalLock(closePromise);
                }
            }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
            tryLockFuture.cancel(true);
        }
    }

    synchronized void unlockInternalLock(CompletableFuture<Void> closePromise) {
        if (this.internalLock == null) {
            FutureUtils.complete(closePromise, null);
        } else {
            this.internalLock.asyncUnlock().whenComplete((value, cause) -> closePromise.complete(null));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> asyncClose() {
        CompletableFuture<Void> closePromise;
        ZKDistributedLock zKDistributedLock = this;
        synchronized (zKDistributedLock) {
            if (this.closed) {
                return this.closeFuture;
            }
            this.closed = true;
            closePromise = new CompletableFuture<Void>();
            this.closeFuture = closePromise;
        }
        CompletableFuture closeWaiterFuture = new CompletableFuture();
        closeWaiterFuture.whenCompleteAsync(new FutureEventListener<Void>(){

            @Override
            public void onSuccess(Void value) {
                this.complete();
            }

            @Override
            public void onFailure(Throwable cause) {
                this.complete();
            }

            private void complete() {
                FutureUtils.complete(closePromise, null);
            }
        }, (Executor)this.lockStateExecutor.chooseThread(this.lockPath));
        this.lockStateExecutor.executeOrdered(this.lockPath, () -> this.closeWaiter(this.lockWaiter, closeWaiterFuture));
        return closePromise;
    }

    void internalReacquireLock(AtomicInteger numRetries, long lockTimeout, CompletableFuture<ZKDistributedLock> reacquirePromise) {
        this.lockStateExecutor.executeOrdered(this.lockPath, () -> this.doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise));
    }

    void doInternalReacquireLock(final AtomicInteger numRetries, final long lockTimeout, final CompletableFuture<ZKDistributedLock> reacquirePromise) {
        this.internalTryRetries.inc();
        CompletableFuture<ZKDistributedLock> tryPromise = new CompletableFuture<ZKDistributedLock>();
        tryPromise.whenComplete(new FutureEventListener<ZKDistributedLock>(){

            @Override
            public void onSuccess(ZKDistributedLock lock) {
                FutureUtils.complete(reacquirePromise, lock);
            }

            @Override
            public void onFailure(Throwable cause) {
                if (cause instanceof OwnershipAcquireFailedException) {
                    FutureUtils.completeExceptionally(reacquirePromise, cause);
                } else if (numRetries.getAndDecrement() > 0 && !ZKDistributedLock.this.closed) {
                    ZKDistributedLock.this.internalReacquireLock(numRetries, lockTimeout, reacquirePromise);
                } else {
                    FutureUtils.completeExceptionally(reacquirePromise, cause);
                }
            }
        });
        this.doAsyncAcquireWithSemaphore(tryPromise, 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
        CompletableFuture<ZKDistributedLock> lockPromise;
        final Stopwatch stopwatch = Stopwatch.createStarted();
        ZKDistributedLock zKDistributedLock = this;
        synchronized (zKDistributedLock) {
            if (this.closed) {
                throw this.newLockClosedException();
            }
            if (null != this.lockReacquireException) {
                if (throwLockAcquireException) {
                    throw this.lockReacquireException;
                }
                return null;
            }
            if (null != this.lockReacquireFuture) {
                return this.lockReacquireFuture;
            }
            LOG.info("reacquiring lock at {}", (Object)this.lockPath);
            lockPromise = new CompletableFuture<ZKDistributedLock>();
            this.lockReacquireFuture = lockPromise;
            this.lockReacquireFuture.whenComplete(new FutureEventListener<ZKDistributedLock>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(ZKDistributedLock lock) {
                    ZKDistributedLock zKDistributedLock = ZKDistributedLock.this;
                    synchronized (zKDistributedLock) {
                        ZKDistributedLock.this.lockReacquireFuture = null;
                    }
                    ZKDistributedLock.this.reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(Throwable cause) {
                    ZKDistributedLock zKDistributedLock = ZKDistributedLock.this;
                    synchronized (zKDistributedLock) {
                        if (cause instanceof LockingException) {
                            ZKDistributedLock.this.lockReacquireException = (LockingException)cause;
                        } else {
                            ZKDistributedLock.this.lockReacquireException = new LockingException(ZKDistributedLock.this.lockPath, "Exception on re-acquiring lock", cause);
                        }
                    }
                    ZKDistributedLock.this.reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
                }
            });
        }
        reacquireCountUpdater.incrementAndGet(this);
        this.internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0L, lockPromise);
        return lockPromise;
    }
}

