/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.coordination.impl;

import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceLockImpl<T>
implements ResourceLock<T> {
    private static final Logger log = LoggerFactory.getLogger(ResourceLockImpl.class);
    private final MetadataStoreExtended store;
    private final MetadataSerde<T> serde;
    private final String path;
    private volatile T value;
    private long version;
    private final CompletableFuture<Void> expiredFuture;
    private boolean revalidateAfterReconnection = false;
    private CompletableFuture<Void> revalidateFuture;
    private State state;

    public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
        this.store = store;
        this.serde = serde;
        this.path = path;
        this.version = -1L;
        this.expiredFuture = new CompletableFuture();
        this.state = State.Init;
    }

    @Override
    public synchronized T getValue() {
        return this.value;
    }

    @Override
    public synchronized CompletableFuture<Void> updateValue(T newValue) {
        return this.acquire(newValue);
    }

    @Override
    public synchronized CompletableFuture<Void> release() {
        if (this.state == State.Released) {
            return CompletableFuture.completedFuture(null);
        }
        this.state = State.Releasing;
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.store.delete(this.path, Optional.of(this.version)).thenRun(() -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                this.state = State.Released;
            }
            this.expiredFuture.complete(null);
            result.complete(null);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                ResourceLockImpl resourceLockImpl = this;
                synchronized (resourceLockImpl) {
                    this.state = State.Released;
                }
                this.expiredFuture.complete(null);
                result.complete(null);
            } else {
                result.completeExceptionally((Throwable)ex);
            }
            return null;
        });
        return result;
    }

    @Override
    public CompletableFuture<Void> getLockExpiredFuture() {
        return this.expiredFuture;
    }

    @Override
    public String getPath() {
        return this.path;
    }

    public int hashCode() {
        return this.path.hashCode();
    }

    synchronized CompletableFuture<Void> acquire(T newValue) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.acquireWithNoRevalidation(newValue).thenRun(() -> result.complete(null))).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.LockBusyException) {
                ((CompletableFuture)this.revalidate(newValue, false).thenAccept(__ -> result.complete(null))).exceptionally(ex1 -> {
                    result.completeExceptionally((Throwable)ex1);
                    return null;
                });
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    private CompletableFuture<Void> acquireWithNoRevalidation(T newValue) {
        byte[] payload;
        if (log.isDebugEnabled()) {
            log.debug("acquireWithNoRevalidation,newValue={},version={}", newValue, (Object)this.version);
        }
        try {
            payload = this.serde.serialize(this.path, newValue);
        }
        catch (Throwable t) {
            return FutureUtils.exception(t);
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.store.put(this.path, payload, Optional.of(this.version), EnumSet.of(CreateOption.Ephemeral)).thenAccept(stat -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                this.state = State.Valid;
                this.version = stat.getVersion();
                this.value = newValue;
            }
            log.info("Acquired resource lock on {}", (Object)this.path);
            result.complete(null);
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                result.completeExceptionally(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
            } else {
                result.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return result;
    }

    synchronized void lockWasInvalidated() {
        if (this.state != State.Valid) {
            return;
        }
        log.info("Lock on resource {} was invalidated", (Object)this.path);
        this.revalidate(this.value, true).thenRun(() -> log.info("Successfully revalidated the lock on {}", (Object)this.path));
    }

    synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
        if (this.revalidateAfterReconnection) {
            this.revalidateAfterReconnection = false;
            log.warn("Revalidate lock at {} after reconnection", (Object)this.path);
            return this.revalidate(this.value, true);
        }
        return CompletableFuture.completedFuture(null);
    }

    synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
        if (this.revalidateFuture == null || this.revalidateFuture.isDone()) {
            this.revalidateFuture = this.doRevalidate(newValue);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Previous revalidating is not finished while revalidate newValue={}, value={}, version={}", new Object[]{newValue, this.value, this.version});
            }
            CompletableFuture newFuture = new CompletableFuture();
            this.revalidateFuture.whenComplete((unused, throwable) -> ((CompletableFuture)this.doRevalidate(newValue).thenRun(() -> newFuture.complete(null))).exceptionally(throwable1 -> {
                newFuture.completeExceptionally((Throwable)throwable1);
                return null;
            }));
            this.revalidateFuture = newFuture;
        }
        this.revalidateFuture.exceptionally(ex -> {
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                if (!revalidateAfterReconnection || realCause instanceof MetadataStoreException.BadVersionException || realCause instanceof MetadataStoreException.LockBusyException) {
                    log.warn("Failed to revalidate the lock at {}. Marked as expired. {}", (Object)this.path, (Object)realCause.getMessage());
                    this.state = State.Released;
                    this.expiredFuture.complete(null);
                } else {
                    this.revalidateAfterReconnection = true;
                    log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", (Object)this.path, (Object)realCause.getMessage());
                }
            }
            return null;
        });
        return this.revalidateFuture;
    }

    private synchronized CompletableFuture<Void> doRevalidate(T newValue) {
        if (log.isDebugEnabled()) {
            log.debug("doRevalidate with newValue={}, version={}", newValue, (Object)this.version);
        }
        return this.store.get(this.path).thenCompose(optGetResult -> {
            T existingValue;
            if (!optGetResult.isPresent()) {
                this.setVersion(-1L);
                return this.acquireWithNoRevalidation(newValue).thenRun(() -> log.info("Successfully re-acquired missing lock at {}", (Object)this.path));
            }
            GetResult res = (GetResult)optGetResult.get();
            if (!res.getStat().isEphemeral()) {
                return FutureUtils.exception(new MetadataStoreException.LockBusyException("Path " + this.path + " is already created as non-ephemeral"));
            }
            try {
                existingValue = this.serde.deserialize(this.path, res.getValue(), res.getStat());
            }
            catch (Throwable t) {
                return FutureUtils.exception(t);
            }
            ResourceLockImpl resourceLockImpl = this;
            synchronized (resourceLockImpl) {
                if (newValue.equals(existingValue)) {
                    if (res.getStat().isCreatedBySelf()) {
                        this.version = res.getStat().getVersion();
                        this.value = newValue;
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Deleting stale lock at {}", (Object)this.path);
                    return ((CompletableFuture)((CompletableFuture)this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenRun(() -> this.setVersion(-1L))).thenCompose(__ -> this.acquireWithNoRevalidation(newValue))).thenRun(() -> log.info("Successfully re-acquired stale lock at {}", (Object)this.path));
                }
                if (!res.getStat().isCreatedBySelf()) {
                    return FutureUtils.exception(new MetadataStoreException.LockBusyException("Resource at " + this.path + " is already locked"));
                }
                return ((CompletableFuture)((CompletableFuture)this.store.delete(this.path, Optional.of(res.getStat().getVersion())).thenRun(() -> this.setVersion(-1L))).thenCompose(__ -> this.acquireWithNoRevalidation(newValue))).thenRun(() -> log.info("Successfully re-acquired lock at {}", (Object)this.path));
            }
        });
    }

    private synchronized void setVersion(long version) {
        this.version = version;
    }

    private static enum State {
        Init,
        Valid,
        Releasing,
        Released;

    }
}

