package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.stats.BrokerStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.class */
public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>, StateChangeListener {
    private static final Logger log = LoggerFactory.getLogger(BrokerLoadDataReporter.class);
    private static final long TOMBSTONE_DELAY_IN_MILLIS = 10000;
    private final PulsarService pulsar;
    private final ServiceConfiguration conf;
    private final LoadDataStore<BrokerLoadData> brokerLoadDataStore;
    private final BrokerHostUsage brokerHostUsage;
    private final String lookupServiceAddress;
    private final BrokerLoadData localData;
    private final BrokerLoadData lastData;
    private volatile long lastTombstonedAt;
    private long tombstoneDelayInMillis;

    public BrokerLoadDataReporter(PulsarService pulsarService, String str, LoadDataStore<BrokerLoadData> loadDataStore) {
        this.brokerLoadDataStore = loadDataStore;
        this.lookupServiceAddress = str;
        this.pulsar = pulsarService;
        this.conf = this.pulsar.getConfiguration();
        if (SystemUtils.IS_OS_LINUX) {
            this.brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsarService);
        } else {
            this.brokerHostUsage = new GenericBrokerHostUsageImpl(pulsarService);
        }
        this.localData = new BrokerLoadData();
        this.lastData = new BrokerLoadData();
        this.tombstoneDelayInMillis = TOMBSTONE_DELAY_IN_MILLIS;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pulsar.broker.loadbalance.extensions.reporter.LoadDataReporter
    public BrokerLoadData generateLoadData() {
        SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage);
        PulsarStats pulsarStats = this.pulsar.getBrokerService().getPulsarStats();
        synchronized (pulsarStats) {
            BrokerStats brokerStats = pulsarStats.getBrokerStats();
            this.localData.update(systemResourceUsage, brokerStats.msgThroughputIn, brokerStats.msgThroughputOut, brokerStats.msgRateIn, brokerStats.msgRateOut, brokerStats.bundleCount, brokerStats.topics, this.pulsar.getConfiguration());
        }
        return this.localData;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.reporter.LoadDataReporter
    public CompletableFuture<Void> reportAsync(boolean z) {
        BrokerLoadData generateLoadData = generateLoadData();
        boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
        if (!z && !needBrokerDataUpdate()) {
            if (debug) {
                log.info("skipping load report:{}", this.localData.toString(this.conf));
            }
            return CompletableFuture.completedFuture(null);
        }
        if (debug) {
            log.info("publishing load report:{}", this.localData.toString(this.conf));
        }
        CompletableFuture<Void> pushAsync = this.brokerLoadDataStore.pushAsync(this.lookupServiceAddress, generateLoadData);
        pushAsync.whenComplete((r5, th) -> {
            if (th != null) {
                log.error("Failed to report the broker load data.", th);
            } else {
                this.localData.setReportedAt(System.currentTimeMillis());
                this.lastData.update(this.localData);
            }
        });
        return pushAsync;
    }

    private boolean needBrokerDataUpdate() {
        int loadBalancerReportUpdateMaxIntervalMinutes = this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes();
        int loadBalancerReportUpdateThresholdPercentage = this.conf.getLoadBalancerReportUpdateThresholdPercentage();
        long millis = TimeUnit.MINUTES.toMillis(loadBalancerReportUpdateMaxIntervalMinutes);
        long currentTimeMillis = System.currentTimeMillis() - this.localData.getReportedAt();
        boolean debug = ExtensibleLoadManagerImpl.debug(this.conf, log);
        if (currentTimeMillis > millis) {
            if (!debug) {
                return true;
            }
            log.info("Writing local data to metadata store because time since last update exceeded threshold of {} minutes", Integer.valueOf(loadBalancerReportUpdateMaxIntervalMinutes));
            return true;
        }
        double max = Math.max(100.0d * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), percentChange(this.lastData.getBundleCount(), this.localData.getBundleCount()))));
        if (max <= loadBalancerReportUpdateThresholdPercentage) {
            return false;
        }
        if (!debug) {
            return true;
        }
        log.info(String.format("Writing local data to metadata store because maximum change %.2f%% exceeded threshold %d%%. Time since last report written is %.2f%% seconds", Double.valueOf(max), Integer.valueOf(loadBalancerReportUpdateThresholdPercentage), Double.valueOf(currentTimeMillis / 1000.0d)));
        return true;
    }

    protected double percentChange(double d, double d2) {
        return d == 0.0d ? d2 == 0.0d ? 0.0d : Double.POSITIVE_INFINITY : 100.0d * Math.abs((d - d2) / d);
    }

    @VisibleForTesting
    protected void tombstone() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastTombstonedAt < this.tombstoneDelayInMillis) {
            return;
        }
        long j = this.lastTombstonedAt;
        this.lastTombstonedAt = currentTimeMillis;
        this.brokerLoadDataStore.removeAsync(this.lookupServiceAddress).whenComplete((r7, th) -> {
            if (th != null) {
                log.error("Failed to clean broker load data.", th);
                this.lastTombstonedAt = j;
            } else if (ExtensibleLoadManagerImpl.debug(this.conf, log)) {
                log.info("Cleaned broker load data.");
            }
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener
    public void handleEvent(String str, ServiceUnitStateData serviceUnitStateData, Throwable th) {
        if (th != null) {
            return;
        }
        switch (ServiceUnitStateData.state(serviceUnitStateData)) {
            case Releasing:
            case Splitting:
                if (StringUtils.equals(serviceUnitStateData.sourceBroker(), this.lookupServiceAddress)) {
                    this.localData.clear();
                    tombstone();
                    return;
                }
                return;
            case Owned:
                if (StringUtils.equals(serviceUnitStateData.dstBroker(), this.lookupServiceAddress)) {
                    this.localData.clear();
                    tombstone();
                    return;
                }
                return;
            default:
                return;
        }
    }

    public BrokerLoadData getLocalData() {
        return this.localData;
    }
}
