package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.class */
public class SplitScheduler implements LoadManagerScheduler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SplitScheduler.class);
    private final PulsarService pulsar;
    private final ScheduledExecutorService loadManagerExecutor;
    private final LoadManagerContext context;
    private final ServiceConfiguration conf;
    private final ServiceUnitStateChannel serviceUnitStateChannel;
    private final NamespaceBundleSplitStrategy bundleSplitStrategy;
    private final SplitCounter counter;
    private final SplitManager splitManager;
    private final AtomicReference<List<Metrics>> splitMetrics;
    private volatile ScheduledFuture<?> task;
    private long counterLastUpdatedAt;

    public SplitScheduler(PulsarService pulsarService, ServiceUnitStateChannel serviceUnitStateChannel, SplitManager splitManager, SplitCounter splitCounter, AtomicReference<List<Metrics>> atomicReference, LoadManagerContext loadManagerContext, NamespaceBundleSplitStrategy namespaceBundleSplitStrategy) {
        this.counterLastUpdatedAt = 0L;
        this.pulsar = pulsarService;
        this.loadManagerExecutor = pulsarService.getLoadManagerExecutor();
        this.splitManager = splitManager;
        this.counter = splitCounter;
        this.splitMetrics = atomicReference;
        this.context = loadManagerContext;
        this.conf = pulsarService.getConfiguration();
        this.bundleSplitStrategy = namespaceBundleSplitStrategy;
        this.serviceUnitStateChannel = serviceUnitStateChannel;
    }

    public SplitScheduler(PulsarService pulsarService, ServiceUnitStateChannel serviceUnitStateChannel, SplitManager splitManager, SplitCounter splitCounter, AtomicReference<List<Metrics>> atomicReference, LoadManagerContext loadManagerContext) {
        this(pulsarService, serviceUnitStateChannel, splitManager, splitCounter, atomicReference, loadManagerContext, new DefaultNamespaceBundleSplitStrategyImpl(splitCounter));
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler
    public void execute() {
        boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
        if (debug) {
            log.info("Load balancer enabled: {}, Split enabled: {}.", Boolean.valueOf(this.conf.isLoadBalancerEnabled()), Boolean.valueOf(this.conf.isLoadBalancerAutoBundleSplitEnabled()));
        }
        if (!isLoadBalancerAutoBundleSplitEnabled()) {
            if (debug) {
                log.info("The load balancer or load balancer split already disabled. Skipping.");
                return;
            }
            return;
        }
        synchronized (this.bundleSplitStrategy) {
            Set<SplitDecision> findBundlesToSplit = this.bundleSplitStrategy.findBundlesToSplit(this.context, this.pulsar);
            if (debug) {
                log.info("Split Decisions:", findBundlesToSplit);
            }
            if (!findBundlesToSplit.isEmpty()) {
                long namespaceBundleUnloadingTimeoutMs = this.conf.getNamespaceBundleUnloadingTimeoutMs();
                ArrayList arrayList = new ArrayList();
                for (SplitDecision splitDecision : findBundlesToSplit) {
                    if (splitDecision.getLabel() == SplitDecision.Label.Success) {
                        Split split = splitDecision.getSplit();
                        arrayList.add(this.splitManager.waitAsync(this.serviceUnitStateChannel.publishSplitEventAsync(split), split.serviceUnit(), splitDecision, namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS));
                    }
                }
                try {
                    FutureUtil.waitForAll(arrayList).get(namespaceBundleUnloadingTimeoutMs, TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    log.error("Failed to wait for split events to persist.", th);
                }
            } else if (debug) {
                log.info("BundleSplitStrategy returned no bundles to split.");
            }
        }
        if (this.counter.updatedAt() > this.counterLastUpdatedAt) {
            this.splitMetrics.set(this.counter.toMetrics(this.pulsar.getAdvertisedAddress()));
            this.counterLastUpdatedAt = this.counter.updatedAt();
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler
    public void start() {
        long millis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSplitIntervalMinutes());
        this.task = this.loadManagerExecutor.scheduleAtFixedRate(() -> {
            try {
                execute();
                if (ExtensibleLoadManagerImpl.debug(this.conf, log)) {
                    StringJoiner stringJoiner = new StringJoiner("\n");
                    stringJoiner.add("### OwnershipEntrySet start ###");
                    this.serviceUnitStateChannel.getOwnershipEntrySet().forEach(entry -> {
                        stringJoiner.add(entry.toString());
                    });
                    stringJoiner.add("### OwnershipEntrySet end ###");
                    log.info(stringJoiner.toString());
                }
            } catch (Throwable th) {
                log.error("Failed to run the split job.", th);
            }
        }, millis, millis, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.task != null) {
            this.task.cancel(false);
            this.task = null;
        }
    }

    private boolean isLoadBalancerAutoBundleSplitEnabled() {
        return this.conf.isLoadBalancerEnabled() && this.conf.isLoadBalancerAutoBundleSplitEnabled();
    }
}
