package dk.cloudcreate.essentials.components.distributed.fencedlock.postgresql;

import dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.distributed.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.distributed.fencedlock.LockName;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.network.Network;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:dk/cloudcreate/essentials/components/distributed/fencedlock/postgresql/PostgresqlFencedLockManager.class */
public class PostgresqlFencedLockManager implements FencedLockManager {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlFencedLockManager.class);
    public static final long FIRST_TOKEN = 1;
    public static final long UNINITIALIZED_LOCK_TOKEN = -1;
    private final ConcurrentMap<LockName, PostgresqlFencedLock> locksAcquiredByThisLockManager;
    private final ConcurrentMap<LockName, ScheduledFuture<?>> asyncLockAcquirings;
    private final String fencedLocksTableName;
    private final Duration lockTimeOut;
    private final Duration lockConfirmationInterval;
    private final Jdbi jdbi;
    private final String lockManagerInstanceId;
    private volatile boolean started;
    private volatile boolean stopping;
    private volatile boolean paused;
    private ScheduledExecutorService lockConfirmationExecutor;
    private ScheduledExecutorService asyncLockAcquiringExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dk/cloudcreate/essentials/components/distributed/fencedlock/postgresql/PostgresqlFencedLockManager$PostgresqlFencedLock.class */
    public class PostgresqlFencedLock implements FencedLock {
        private LockName lockName;
        private long currentToken;
        private String lockedByLockManagerInstanceId;
        private OffsetDateTime lockAcquiredTimestamp;
        private OffsetDateTime lockLastConfirmedTimestamp;
        private List<LockCallback> lockCallbacks = new ArrayList();

        public PostgresqlFencedLock(LockName lockName, long j, String str, OffsetDateTime offsetDateTime, OffsetDateTime offsetDateTime2) {
            this.lockName = (LockName) FailFast.requireNonNull(lockName, "lockName is missing");
            this.currentToken = j;
            this.lockedByLockManagerInstanceId = str;
            this.lockAcquiredTimestamp = offsetDateTime;
            this.lockLastConfirmedTimestamp = offsetDateTime2;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public LockName getName() {
            return this.lockName;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public long getCurrentToken() {
            return this.currentToken;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public String getLockedByLockManagerInstanceId() {
            return this.lockedByLockManagerInstanceId;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public OffsetDateTime getLockAcquiredTimestamp() {
            return this.lockAcquiredTimestamp;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public OffsetDateTime getLockLastConfirmedTimestamp() {
            return this.lockLastConfirmedTimestamp;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public boolean isLocked() {
            return this.lockedByLockManagerInstanceId != null;
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public boolean isLockedByThisLockManagerInstance() {
            return isLocked() && Objects.equals(this.lockedByLockManagerInstanceId, PostgresqlFencedLockManager.this.lockManagerInstanceId);
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public void release() {
            if (isLockedByThisLockManagerInstance()) {
                PostgresqlFencedLockManager.this.releaseLock(this);
            }
        }

        @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLock
        public void registerCallback(LockCallback lockCallback) {
            this.lockCallbacks.add(lockCallback);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.lockName.equals(((PostgresqlFencedLock) obj).lockName);
        }

        public int hashCode() {
            return Objects.hash(this.lockName);
        }

        public Duration getDurationSinceLastConfirmation() {
            FailFast.requireNonNull(this.lockLastConfirmedTimestamp, MessageFormatter.msg("FencedLock '{}' doesn't have a lockLastConfirmedTimestamp", new Object[]{this.lockName}));
            return Duration.between(this.lockLastConfirmedTimestamp, ZonedDateTime.now()).abs();
        }

        private void markAsUnlocked() {
            this.lockedByLockManagerInstanceId = null;
            this.lockCallbacks.forEach(lockCallback -> {
                lockCallback.lockReleased(this);
            });
        }

        PostgresqlFencedLock markAsConfirmed(OffsetDateTime offsetDateTime) {
            this.lockLastConfirmedTimestamp = offsetDateTime;
            return this;
        }

        public PostgresqlFencedLock markAsLocked(OffsetDateTime offsetDateTime, String str, long j) {
            this.lockAcquiredTimestamp = offsetDateTime;
            this.lockLastConfirmedTimestamp = offsetDateTime;
            this.lockedByLockManagerInstanceId = str;
            this.currentToken = j;
            this.lockCallbacks.forEach(lockCallback -> {
                lockCallback.lockAcquired(this);
            });
            return this;
        }

        public String toString() {
            LockName lockName = this.lockName;
            long j = this.currentToken;
            String str = this.lockedByLockManagerInstanceId;
            OffsetDateTime offsetDateTime = this.lockAcquiredTimestamp;
            OffsetDateTime offsetDateTime2 = this.lockLastConfirmedTimestamp;
            return "PostgresqlFencedLock{lockName=" + lockName + ", currentTokenIssuedToThisLockInstance=" + j + ", lockedByLockManagerInstanceId='" + lockName + "', lockAcquiredTimestamp=" + str + ", lockLastConfirmedTimestamp=" + offsetDateTime + "}";
        }
    }

    public PostgresqlFencedLockManager(Jdbi jdbi, Duration duration, Duration duration2) {
        this(jdbi, Optional.empty(), Optional.empty(), duration, duration2);
    }

    public PostgresqlFencedLockManager(Jdbi jdbi, Optional<String> optional, Optional<String> optional2, Duration duration, Duration duration2) {
        FailFast.requireNonNull(optional, "No lockManagerInstanceId option provided");
        FailFast.requireNonNull(optional2, "No fencedLocksTableName option provided");
        this.jdbi = (Jdbi) FailFast.requireNonNull(jdbi, "You must supply a jdbi instance");
        this.lockManagerInstanceId = optional.orElseGet(Network::hostName);
        this.fencedLocksTableName = optional2.orElse("fenced_locks");
        this.lockTimeOut = (Duration) FailFast.requireNonNull(duration, "No lockTimeOut value provided");
        this.lockConfirmationInterval = (Duration) FailFast.requireNonNull(duration2, "No lockConfirmationInterval value provided");
        if (duration2.compareTo(duration) >= 1) {
            throw new IllegalArgumentException(MessageFormatter.msg("lockConfirmationInterval {} duration MUST not be larger than the lockTimeOut {} duration, because locks will then always timeout", new Object[]{duration2, duration}));
        }
        this.locksAcquiredByThisLockManager = new ConcurrentHashMap();
        this.asyncLockAcquirings = new ConcurrentHashMap();
        jdbi.registerArgument(new LockNameArgumentFactory());
        initializeLockTable();
    }

    protected void initializeLockTable() {
        this.jdbi.useTransaction(handle -> {
            if (handle.execute("CREATE TABLE IF NOT EXISTS " + this.fencedLocksTableName + " (\nlock_name TEXT NOT NULL,\nlast_issued_fence_token bigint,\nlocked_by_lockmanager_instance_id TEXT,\nlock_acquired_ts TIMESTAMP WITH TIME ZONE,\nlock_last_confirmed_ts TIMESTAMP WITH TIME ZONE,\nPRIMARY KEY (lock_name)\n)", new Object[0]) == 1) {
                log.info("[{}] Created the '{}' fenced locks table", this.lockManagerInstanceId, this.fencedLocksTableName);
            }
            String str = this.fencedLocksTableName + "_current_token_index";
            if (handle.execute("CREATE INDEX IF NOT EXISTS " + str + " ON " + this.fencedLocksTableName + " (lock_name, last_issued_fence_token)", new Object[0]) == 1) {
                log.debug("[{}] Created the '{}' index on fenced locks table '{}'", new Object[]{this.lockManagerInstanceId, str, this.fencedLocksTableName});
            }
        });
    }

    public void start() {
        if (this.started) {
            log.debug("[{}] Lock Manager was already started", this.lockManagerInstanceId);
            return;
        }
        log.info("[{}] Starting lock manager", this.lockManagerInstanceId);
        this.stopping = false;
        this.lockConfirmationExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().nameFormat(this.lockManagerInstanceId + "-FencedLock-Confirmation-%d").daemon(true).build());
        this.asyncLockAcquiringExecutor = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder.builder().nameFormat(this.lockManagerInstanceId + "-Lock-Acquiring-%d").daemon(true).build());
        this.lockConfirmationExecutor.scheduleAtFixedRate(this::confirmAllLocallyAcquiredLocks, this.lockConfirmationInterval.toMillis(), this.lockConfirmationInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.started = true;
        log.info("[{}] Started lock manager", this.lockManagerInstanceId);
    }

    void pause() {
        log.info("[{}] Pausing async lock acquiring and lock confirmation", this.lockManagerInstanceId);
        this.paused = true;
    }

    void resume() {
        log.info("[{}] Resuming async lock acquiring and lock confirmation", this.lockManagerInstanceId);
        this.paused = false;
    }

    private void confirmAllLocallyAcquiredLocks() {
        if (this.stopping) {
            log.debug("[{}] Shutting down, skipping confirmAllLocallyAcquiredLocks", this.lockManagerInstanceId);
            return;
        }
        if (this.locksAcquiredByThisLockManager.size() == 0) {
            log.debug("[{}] No locks to confirm for this Lock Manager instance", this.lockManagerInstanceId);
            return;
        }
        if (this.paused) {
            log.info("[{}] Lock Manager is paused, skipping confirmAllLocallyAcquiredLocks", this.lockManagerInstanceId);
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("[{}] Confirming {} locks acquired by this Lock Manager Instance: {}", new Object[]{this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()), this.locksAcquiredByThisLockManager.keySet()});
        } else {
            log.debug("[{}] Confirming {} locks acquired by this Lock Manager Instance", this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()));
        }
        OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
        this.jdbi.useTransaction(handle -> {
            this.locksAcquiredByThisLockManager.forEach((lockName, postgresqlFencedLock) -> {
                if (handle.createUpdate("UPDATE " + this.fencedLocksTableName + " SET lock_last_confirmed_ts=:lock_last_confirmed_ts\nWHERE lock_name=:lock_name AND last_issued_fence_token=:last_issued_fence_token AND locked_by_lockmanager_instance_id=:locked_by_lockmanager_instance_id").bind("lock_name", lockName).bind("locked_by_lockmanager_instance_id", (String) FailFast.requireNonNull(postgresqlFencedLock.getLockedByLockManagerInstanceId(), MessageFormatter.msg("[{}] getLockedByLockManagerInstanceId was NULL. Details: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock}))).bind("lock_last_confirmed_ts", now).bind("last_issued_fence_token", postgresqlFencedLock.getCurrentToken()).execute() == 0) {
                    log.info("[{}] Failed to confirm lock '{}', someone has taken over the lock: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.getName(), postgresqlFencedLock});
                    postgresqlFencedLock.release();
                } else {
                    postgresqlFencedLock.markAsConfirmed(now);
                    log.debug("[{}] Confirmed lock '{}': {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.getName(), postgresqlFencedLock});
                }
            });
        });
        if (log.isTraceEnabled()) {
            log.trace("[{}] Completed confirmation of locks acquired by this Lock Manager Instance. Number of Locks acquired locally after confirmation {}: {}", new Object[]{this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()), this.locksAcquiredByThisLockManager.keySet()});
        } else {
            log.debug("[{}] Completed confirmation of locks acquired by this Lock Manager Instance. Number of Locks acquired locally after confirmation {}", this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()));
        }
    }

    private void releaseLock(PostgresqlFencedLock postgresqlFencedLock) {
        if (!postgresqlFencedLock.isLockedByThisLockManagerInstance()) {
            throw new IllegalArgumentException(MessageFormatter.msg("[{}] Cannot release Lock '{}' since it isn't locked by the current Lock Manager Node. Details: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock}));
        }
        this.jdbi.useTransaction(handle -> {
            int execute = handle.createUpdate("UPDATE " + this.fencedLocksTableName + " SET locked_by_lockmanager_instance_id=NULL\nWHERE lock_name=:lock_name AND last_issued_fence_token=:lock_last_issued_token").bind("lock_name", postgresqlFencedLock.lockName).bind("lock_last_issued_token", postgresqlFencedLock.currentToken).execute();
            postgresqlFencedLock.markAsUnlocked();
            this.locksAcquiredByThisLockManager.remove(postgresqlFencedLock.lockName);
            if (execute == 0) {
                lookupLockInDB(handle, postgresqlFencedLock.lockName).ifPresent(postgresqlFencedLock2 -> {
                    log.debug("[{}] Couldn't release Lock '{}' as it was already acquired by another JVM Node: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock2.lockedByLockManagerInstanceId});
                });
            } else {
                log.debug("[{}] Released Lock '{}': {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock});
            }
        });
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public Optional<FencedLock> lookupLock(LockName lockName) {
        Optional<PostgresqlFencedLock> lookupLockInDB = lookupLockInDB(lockName);
        Class<FencedLock> cls = FencedLock.class;
        Objects.requireNonNull(FencedLock.class);
        return lookupLockInDB.map((v1) -> {
            return r1.cast(v1);
        });
    }

    private Optional<PostgresqlFencedLock> lookupLockInDB(LockName lockName) {
        return (Optional) this.jdbi.inTransaction(handle -> {
            return lookupLockInDB(handle, lockName);
        });
    }

    private Optional<PostgresqlFencedLock> lookupLockInDB(Handle handle, LockName lockName) {
        Optional<PostgresqlFencedLock> findOne = handle.createQuery("SELECT * FROM " + this.fencedLocksTableName + " WHERE lock_name=:lock_name").bind("lock_name", lockName).map(rowView -> {
            return new PostgresqlFencedLock(lockName, ((Long) rowView.getColumn("last_issued_fence_token", Long.class)).longValue(), (String) rowView.getColumn("locked_by_lockmanager_instance_id", String.class), (OffsetDateTime) rowView.getColumn("lock_acquired_ts", OffsetDateTime.class), (OffsetDateTime) rowView.getColumn("lock_last_confirmed_ts", OffsetDateTime.class));
        }).findOne();
        log.trace("[{}] Looking up lock '{}': {}", new Object[]{this.lockManagerInstanceId, lockName, findOne});
        return findOne;
    }

    public void stop() {
        if (!this.started) {
            log.debug("[{}] Lock Manager was already stopped", this.lockManagerInstanceId);
            return;
        }
        log.debug("[{}] Stopping down lock manager", this.lockManagerInstanceId);
        this.stopping = true;
        if (this.asyncLockAcquiringExecutor != null) {
            this.asyncLockAcquiringExecutor.shutdownNow();
            this.asyncLockAcquiringExecutor = null;
        }
        if (this.lockConfirmationExecutor != null) {
            this.lockConfirmationExecutor.shutdownNow();
            this.lockConfirmationExecutor = null;
        }
        this.locksAcquiredByThisLockManager.values().forEach(postgresqlFencedLock -> {
            postgresqlFencedLock.release();
        });
        this.started = false;
        this.stopping = false;
        log.debug("[{}] Stopped lock manager", this.lockManagerInstanceId);
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public Optional<FencedLock> tryAcquireLock(LockName lockName) {
        return Optional.ofNullable((PostgresqlFencedLock) _tryAcquireLock(lockName).block());
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public Optional<FencedLock> tryAcquireLock(LockName lockName, Duration duration) {
        FailFast.requireNonNull(duration, "No timeout value provided");
        return Optional.ofNullable((FencedLock) _tryAcquireLock(lockName).repeatWhenEmpty(flux -> {
            return flux.doOnNext(l -> {
            });
        }).block(duration));
    }

    private Mono<PostgresqlFencedLock> _tryAcquireLock(LockName lockName) {
        FailFast.requireNonNull(lockName, "No lockName provided");
        log.debug("[{}] Handling request to acquire lock '{}'", this.lockManagerInstanceId, lockName);
        PostgresqlFencedLock postgresqlFencedLock = this.locksAcquiredByThisLockManager.get(lockName);
        if (postgresqlFencedLock == null || !postgresqlFencedLock.isLocked() || isLockTimedOut(postgresqlFencedLock)) {
            return (Mono) this.jdbi.inTransaction(handle -> {
                return resolveLock(handle, lookupLockInDB(lockName).orElseGet(() -> {
                    return new PostgresqlFencedLock(lockName, -1L, null, null, null);
                }));
            });
        }
        log.debug("[{}] Returned cached locally acquired lock '{}", this.lockManagerInstanceId, lockName);
        return Mono.just(postgresqlFencedLock);
    }

    private Mono<PostgresqlFencedLock> resolveLock(Handle handle, PostgresqlFencedLock postgresqlFencedLock) {
        FailFast.requireNonNull(handle, "No handle provided");
        FailFast.requireNonNull(postgresqlFencedLock, "No existingLock provided");
        if (!postgresqlFencedLock.isLocked()) {
            if (postgresqlFencedLock.currentToken == -1) {
                return insertLockIntoDB(handle, postgresqlFencedLock);
            }
            OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
            PostgresqlFencedLock postgresqlFencedLock2 = new PostgresqlFencedLock(postgresqlFencedLock.lockName, postgresqlFencedLock.currentToken + 1, this.lockManagerInstanceId, now, now);
            log.debug("[{}] Found un-acquired lock '{}'. Have Acquired lock. Existing lock: {} - New lock: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock, postgresqlFencedLock2});
            return updateLockInDB(handle, postgresqlFencedLock, postgresqlFencedLock2);
        }
        if (postgresqlFencedLock.isLockedByThisLockManagerInstance()) {
            log.debug("[{}] lock '{}' was already acquired by this JVM node: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock});
            this.locksAcquiredByThisLockManager.put(postgresqlFencedLock.lockName, postgresqlFencedLock);
            return Mono.just(postgresqlFencedLock);
        }
        if (!isLockTimedOut(postgresqlFencedLock)) {
            return Mono.empty();
        }
        OffsetDateTime now2 = OffsetDateTime.now(Clock.systemUTC());
        PostgresqlFencedLock postgresqlFencedLock3 = new PostgresqlFencedLock(postgresqlFencedLock.lockName, postgresqlFencedLock.currentToken + 1, this.lockManagerInstanceId, now2, now2);
        log.debug("[{}] Found a TIMED-OUT lock '{}', that was acquired by Lock Manager '{}'. Will attempt to acquire the lock. Timed-out lock: {} - New lock: {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock.lockedByLockManagerInstanceId, postgresqlFencedLock, postgresqlFencedLock3});
        return updateLockInDB(handle, postgresqlFencedLock, postgresqlFencedLock3);
    }

    private Mono<PostgresqlFencedLock> insertLockIntoDB(Handle handle, PostgresqlFencedLock postgresqlFencedLock) {
        FailFast.requireNonNull(handle, "No handle provided");
        FailFast.requireNonNull(postgresqlFencedLock, "No initialLock provided");
        OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
        if (handle.createUpdate("INSERT INTO " + this.fencedLocksTableName + " (lock_name, last_issued_fence_token, locked_by_lockmanager_instance_id ,\nlock_acquired_ts, lock_last_confirmed_ts)\n VALUES (\n:lock_name, :last_issued_fence_token, :locked_by_lockmanager_instance_id,\n:lock_acquired_ts, :lock_last_confirmed_ts)").bind("lock_name", postgresqlFencedLock.lockName).bind("last_issued_fence_token", 1L).bind("locked_by_lockmanager_instance_id", this.lockManagerInstanceId).bind("lock_acquired_ts", now).bind("lock_last_confirmed_ts", now).execute() == 0) {
            log.debug("[{}] Failed to acquire lock '{}' for the first time (insert)", this.lockManagerInstanceId, postgresqlFencedLock.lockName);
            return Mono.empty();
        }
        postgresqlFencedLock.markAsLocked(now, this.lockManagerInstanceId, 1L);
        log.debug("[{}] Acquired lock '{}' for the first time (insert): {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock});
        this.locksAcquiredByThisLockManager.put(postgresqlFencedLock.lockName, postgresqlFencedLock);
        return Mono.just(postgresqlFencedLock);
    }

    private Mono<PostgresqlFencedLock> updateLockInDB(Handle handle, PostgresqlFencedLock postgresqlFencedLock, PostgresqlFencedLock postgresqlFencedLock2) {
        FailFast.requireNonNull(handle, "No handle provided");
        FailFast.requireNonNull(postgresqlFencedLock, "No timedOutLock provided");
        FailFast.requireNonNull(postgresqlFencedLock2, "No newLockReadyToBeAcquiredLocally provided");
        if (handle.createUpdate("UPDATE " + this.fencedLocksTableName + " SET last_issued_fence_token=:last_issued_fence_token, locked_by_lockmanager_instance_id=:locked_by_lockmanager_instance_id,\nlock_acquired_ts=:lock_acquired_ts, lock_last_confirmed_ts=:lock_last_confirmed_ts\nWHERE lock_name=:lock_name AND last_issued_fence_token=:previous_last_issued_fence_token").bind("lock_name", postgresqlFencedLock.lockName).bind("last_issued_fence_token", postgresqlFencedLock2.currentToken).bind("locked_by_lockmanager_instance_id", postgresqlFencedLock2.lockedByLockManagerInstanceId).bind("lock_acquired_ts", postgresqlFencedLock2.lockAcquiredTimestamp).bind("lock_last_confirmed_ts", postgresqlFencedLock2.lockLastConfirmedTimestamp).bind("previous_last_issued_fence_token", postgresqlFencedLock.currentToken).execute() == 0) {
            log.debug("[{}] Didn't acquire timed out lock '{}', someone else acquired it in the mean time(update): {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, lookupLockInDB(handle, postgresqlFencedLock.lockName)});
            return Mono.empty();
        }
        log.debug("[{}] Acquired lock '{}' (update): {}", new Object[]{this.lockManagerInstanceId, postgresqlFencedLock.lockName, postgresqlFencedLock2});
        this.locksAcquiredByThisLockManager.put(postgresqlFencedLock.lockName, postgresqlFencedLock2);
        postgresqlFencedLock2.markAsLocked(postgresqlFencedLock2.lockAcquiredTimestamp, postgresqlFencedLock2.lockedByLockManagerInstanceId, postgresqlFencedLock2.currentToken);
        return Mono.just(postgresqlFencedLock2);
    }

    private boolean isLockTimedOut(PostgresqlFencedLock postgresqlFencedLock) {
        FailFast.requireNonNull(postgresqlFencedLock, "No lock provided");
        return postgresqlFencedLock.getDurationSinceLastConfirmation().compareTo(this.lockTimeOut) >= 1;
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public FencedLock acquireLock(LockName lockName) {
        return (FencedLock) _tryAcquireLock(lockName).repeatWhenEmpty(flux -> {
            return flux.doOnNext(l -> {
            });
        }).block();
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public boolean isLockAcquired(LockName lockName) {
        Optional<PostgresqlFencedLock> lookupLockInDB = lookupLockInDB(lockName);
        if (lookupLockInDB.isEmpty()) {
            return false;
        }
        return lookupLockInDB.get().isLocked();
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public boolean isLockedByThisLockManagerInstance(LockName lockName) {
        Optional<PostgresqlFencedLock> lookupLockInDB = lookupLockInDB(lockName);
        if (lookupLockInDB.isEmpty()) {
            return false;
        }
        return lookupLockInDB.get().isLockedByThisLockManagerInstance();
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public boolean isLockAcquiredByAnotherLockManagerInstance(LockName lockName) {
        Optional<PostgresqlFencedLock> lookupLockInDB = lookupLockInDB(lockName);
        return (lookupLockInDB.isEmpty() || !lookupLockInDB.get().isLocked() || lookupLockInDB.get().isLockedByThisLockManagerInstance()) ? false : true;
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public void acquireLockAsync(LockName lockName, LockCallback lockCallback) {
        FailFast.requireNonNull(lockName, "You must supply a lockName");
        FailFast.requireNonNull(lockCallback, "You must supply a lockCallback");
        this.asyncLockAcquirings.computeIfAbsent(lockName, lockName2 -> {
            log.debug("[{}] Starting async Lock acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
            return this.asyncLockAcquiringExecutor.scheduleAtFixedRate(() -> {
                PostgresqlFencedLock postgresqlFencedLock = this.locksAcquiredByThisLockManager.get(lockName);
                if (postgresqlFencedLock != null) {
                    if (postgresqlFencedLock.isLockedByThisLockManagerInstance()) {
                        return;
                    }
                    log.debug("[{}] Noticed that lock '{}' isn't locked by this Lock Manager instance anymore. Releasing the lock", this.lockManagerInstanceId, lockName);
                    this.locksAcquiredByThisLockManager.remove(lockName);
                    lockCallback.lockReleased(postgresqlFencedLock);
                    return;
                }
                if (this.paused) {
                    log.info("[{}] Lock Manager is paused, skipping async acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
                    return;
                }
                Optional<FencedLock> tryAcquireLock = tryAcquireLock(lockName);
                if (!tryAcquireLock.isPresent()) {
                    if (log.isTraceEnabled()) {
                        log.trace("[{}] Couldn't async Acquire lock '{}' as it is acquired by another Lock Manager instance: {}", new Object[]{this.lockManagerInstanceId, lockName, lookupLockInDB(lockName)});
                    }
                } else {
                    log.debug("[{}] Async Acquired lock '{}'", this.lockManagerInstanceId, lockName);
                    PostgresqlFencedLock postgresqlFencedLock2 = (PostgresqlFencedLock) tryAcquireLock.get();
                    postgresqlFencedLock2.registerCallback(lockCallback);
                    this.locksAcquiredByThisLockManager.put(lockName, postgresqlFencedLock2);
                    lockCallback.lockAcquired(tryAcquireLock.get());
                }
            }, 0L, this.lockConfirmationInterval.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public void cancelAsyncLockAcquiring(LockName lockName) {
        FailFast.requireNonNull(lockName, "You must supply a lockName");
        ScheduledFuture<?> remove = this.asyncLockAcquirings.remove(lockName);
        if (remove != null) {
            log.debug("[{}] Canceling async Lock acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
            remove.cancel(true);
            PostgresqlFencedLock remove2 = this.locksAcquiredByThisLockManager.remove(lockName);
            if (remove2.isLockedByThisLockManagerInstance()) {
                log.debug("[{}] Releasing Lock due to cancelling the lock acquiring '{}'", this.lockManagerInstanceId, lockName);
                remove2.release();
            }
        }
    }

    @Override // dk.cloudcreate.essentials.components.distributed.fencedlock.FencedLockManager
    public String getLockManagerInstanceId() {
        return this.lockManagerInstanceId;
    }

    public String toString() {
        return "PostgresqlFencedLockManager{lockManagerInstanceId='" + this.lockManagerInstanceId + "'}";
    }
}
