/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.cluster;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterControllerLeaderImpl
implements ClusterControllerLeader,
RegistrationClient.RegistrationListener {
    private static final Logger log = LoggerFactory.getLogger(ClusterControllerLeaderImpl.class);
    private final ClusterMetadataStore clusterMetadataStore;
    private final StorageContainerController scController;
    private final Semaphore performServerChangesPermits;
    private final RegistrationClient regClient;
    private volatile Set<BookieId> availableServers;
    private final Object suspensionLock = new Object();
    private volatile boolean suspended = false;
    private long lastSuccessfulAssigmentAt;
    private final Duration scheduleDuration;

    ClusterControllerLeaderImpl(ClusterMetadataStore clusterMetadataStore, StorageContainerController scController, RegistrationClient regClient, Duration scheduleDuration) {
        this.clusterMetadataStore = clusterMetadataStore;
        this.scController = scController;
        this.regClient = regClient;
        this.performServerChangesPermits = new Semaphore(0);
        this.lastSuccessfulAssigmentAt = -1L;
        this.scheduleDuration = scheduleDuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void suspend() {
        Object object = this.suspensionLock;
        synchronized (object) {
            this.suspended = true;
            this.suspensionLock.notifyAll();
        }
    }

    boolean isSuspended() {
        return this.suspended;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume() {
        Object object = this.suspensionLock;
        synchronized (object) {
            this.suspended = false;
            this.suspensionLock.notifyAll();
        }
    }

    @Override
    public void processAsLeader() throws Exception {
        log.info("Become controller leader to monitor servers for assigning storage containers.");
        this.performServerChangesPermits.release();
        try {
            this.regClient.watchWritableBookies(this).get();
        }
        catch (Exception e) {
            log.warn("Controller leader fails to watch servers : {}, giving up leadership", (Object)e.getMessage());
            throw e;
        }
        while (true) {
            try {
                while (true) {
                    this.checkSuspension();
                    this.processServerChange();
                }
            }
            catch (InterruptedException ie) {
                log.warn("Controller leader is interrupted, giving up leadership");
                this.regClient.unwatchWritableBookies(this);
                throw ie;
            }
            catch (Exception e) {
                if (this.suspended) continue;
                log.warn("Controller leader encountered exceptions on processing server changes, giving up leadership");
                this.regClient.unwatchWritableBookies(this);
                throw e;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkSuspension() throws InterruptedException {
        Object object = this.suspensionLock;
        synchronized (object) {
            while (this.suspended) {
                log.info("Controller leader is suspended, waiting for to be resumed");
                this.suspensionLock.wait();
                log.info("Controller leader is woke up from suspension");
            }
        }
    }

    private void processServerChange() throws InterruptedException {
        ClusterAssignmentData currentState;
        this.performServerChangesPermits.acquire();
        long elapsedMs = System.currentTimeMillis() - this.lastSuccessfulAssigmentAt;
        long remainingMs = this.scheduleDuration.toMillis() - elapsedMs;
        if (remainingMs > 0L) {
            log.info("Waiting {} milliseconds for controller to assign containers", (Object)remainingMs);
            TimeUnit.MILLISECONDS.sleep(remainingMs);
        }
        this.performServerChangesPermits.drainPermits();
        Set<BookieId> availableServersSnapshot = this.availableServers;
        if (null == availableServersSnapshot || availableServersSnapshot.isEmpty()) {
            if (this.lastSuccessfulAssigmentAt < 0L) {
                log.info("No servers is alive yet. Backoff 200ms and retry.");
                TimeUnit.MILLISECONDS.sleep(200L);
                this.performServerChangesPermits.release();
                return;
            }
            return;
        }
        ClusterMetadata clusterMetadata = this.clusterMetadataStore.getClusterMetadata();
        ClusterAssignmentData newState = this.scController.computeIdealState(clusterMetadata, currentState = this.clusterMetadataStore.getClusterAssignmentData(), availableServersSnapshot);
        if (newState.equals(currentState)) {
            if (log.isDebugEnabled()) {
                log.debug("Assignment state is unchanged - {}", (Object)newState);
            }
        } else {
            this.lastSuccessfulAssigmentAt = System.currentTimeMillis();
            this.clusterMetadataStore.updateClusterAssignmentData(newState);
        }
    }

    @Override
    public void onBookiesChanged(Versioned<Set<BookieId>> bookies) {
        log.info("Cluster topology is changed - new cluster : {}", bookies);
        this.availableServers = bookies.getValue();
        this.performServerChangesPermits.release();
    }

    Semaphore getPerformServerChangesPermits() {
        return this.performServerChangesPermits;
    }

    Object getSuspensionLock() {
        return this.suspensionLock;
    }

    long getLastSuccessfulAssigmentAt() {
        return this.lastSuccessfulAssigmentAt;
    }
}

