package org.apache.pulsar.broker.loadbalance.extensions.manager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.prometheus.client.Histogram;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.class */
public class UnloadManager implements StateChangeListener {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(UnloadManager.class);
    private final UnloadCounter counter;
    private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest = new ConcurrentHashMap();
    private final String brokerId;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager$LatencyMetric.class */
    public enum LatencyMetric {
        UNLOAD(buildHistogram("brk_lb_unload_latency", "Total time duration of unload operations on source brokers"), true, false),
        ASSIGN(buildHistogram("brk_lb_assign_latency", "Time spent in the load balancing ASSIGN state on destination brokers"), false, true),
        RELEASE(buildHistogram("brk_lb_release_latency", "Time spent in the load balancing RELEASE state on source brokers"), true, false),
        DISCONNECT(buildHistogram("brk_lb_disconnect_latency", "Time spent in the load balancing disconnected state on source brokers"), true, false);

        private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
        private final Histogram histogram;
        private final Map<String, CompletableFuture<Void>> futures = new ConcurrentHashMap();
        private final boolean isSourceBrokerMetric;
        private final boolean isDestinationBrokerMetric;

        private static Histogram buildHistogram(String str, String str2) {
            return Histogram.build(str, str2).unit("ms").labelNames(new String[]{"broker", "metric"}).buckets(new double[]{1.0d, 10.0d, 100.0d, 200.0d, 1000.0d}).register();
        }

        LatencyMetric(Histogram histogram, boolean z, boolean z2) {
            this.histogram = histogram;
            this.isSourceBrokerMetric = z;
            this.isDestinationBrokerMetric = z2;
        }

        public void beginMeasurement(String str, String str2, ServiceUnitStateData serviceUnitStateData) {
            if ((this.isSourceBrokerMetric && str2.equals(serviceUnitStateData.sourceBroker())) || (this.isDestinationBrokerMetric && str2.equals(serviceUnitStateData.dstBroker()))) {
                long nanoTime = System.nanoTime();
                this.futures.computeIfAbsent(str, str3 -> {
                    CompletableFuture completableFuture = new CompletableFuture();
                    completableFuture.completeOnTimeout(null, OP_TIMEOUT_NS, TimeUnit.NANOSECONDS).thenAccept(r13 -> {
                        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                        UnloadManager.log.info("Operation {} for service unit {} took {} ms", new Object[]{this, str, Long.valueOf(millis)});
                        ((Histogram.Child) this.histogram.labels(new String[]{str2, "bundleUnloading"})).observe(millis);
                    }).whenComplete((r7, th) -> {
                        this.futures.remove(str, completableFuture);
                    });
                    return completableFuture;
                });
            }
        }

        public void endMeasurement(String str) {
            CompletableFuture<Void> completableFuture = this.futures.get(str);
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        }
    }

    public UnloadManager(UnloadCounter unloadCounter, String str) {
        this.counter = unloadCounter;
        this.brokerId = (String) Objects.requireNonNull(str);
    }

    private void complete(String str, Throwable th) {
        LatencyMetric.UNLOAD.endMeasurement(str);
        LatencyMetric.DISCONNECT.endMeasurement(str);
        if (th != null) {
            LatencyMetric.RELEASE.endMeasurement(str);
            LatencyMetric.ASSIGN.endMeasurement(str);
        }
        this.inFlightUnloadRequest.computeIfPresent(str, (str2, completableFuture) -> {
            if (completableFuture.isDone()) {
                return null;
            }
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            completableFuture.complete(null);
            return null;
        });
    }

    public CompletableFuture<Void> waitAsync(CompletableFuture<Void> completableFuture, String str, UnloadDecision unloadDecision, long j, TimeUnit timeUnit) {
        return completableFuture.thenCompose(r13 -> {
            return this.inFlightUnloadRequest.computeIfAbsent(str, str2 -> {
                if (log.isDebugEnabled()) {
                    log.debug("Handle unload bundle: {}, timeout: {} {}", new Object[]{str, Long.valueOf(j), timeUnit});
                }
                CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture2.orTimeout(j, timeUnit).whenComplete((r7, th) -> {
                    if (th != null) {
                        this.inFlightUnloadRequest.remove(str);
                        log.warn("Failed to wait unload for serviceUnit: {}", str, th);
                    }
                });
                return completableFuture2;
            });
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r8, th) -> {
            if (th != null) {
                this.counter.update(UnloadDecision.Label.Failure, UnloadDecision.Reason.Unknown);
                log.warn("Failed to unload bundle: {}", str, th);
            } else {
                log.info("Complete unload bundle: {}", str);
                this.counter.update(unloadDecision);
            }
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener
    public void beforeEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        if (log.isDebugEnabled()) {
            log.debug("Handling arrival of {} for service unit {}", serviceUnitStateData, str);
        }
        switch (ServiceUnitStateData.state(serviceUnitStateData)) {
            case Free:
            case Owned:
                LatencyMetric.DISCONNECT.beginMeasurement(str, this.brokerId, serviceUnitStateData);
                return;
            case Releasing:
                LatencyMetric.RELEASE.beginMeasurement(str, this.brokerId, serviceUnitStateData);
                LatencyMetric.UNLOAD.beginMeasurement(str, this.brokerId, serviceUnitStateData);
                return;
            case Assigning:
                LatencyMetric.ASSIGN.beginMeasurement(str, this.brokerId, serviceUnitStateData);
                return;
            default:
                return;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener
    public void handleEvent(String str, ServiceUnitStateData serviceUnitStateData, Throwable th) {
        ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
        if ((state == ServiceUnitState.Owned || state == ServiceUnitState.Assigning) && StringUtils.isBlank(serviceUnitStateData.sourceBroker())) {
            if (log.isDebugEnabled()) {
                log.debug("Skipping {} for service unit {} from the assignment command.", serviceUnitStateData, str);
                return;
            }
            return;
        }
        if (th != null) {
            if (log.isDebugEnabled()) {
                log.debug("Handling {} for service unit {} with exception.", new Object[]{serviceUnitStateData, str, th});
            }
            complete(str, th);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Handling {} for service unit {}", serviceUnitStateData, str);
        }
        switch (state) {
            case Free:
                if (serviceUnitStateData.force()) {
                    return;
                }
                complete(str, th);
                return;
            case Owned:
                complete(str, th);
                return;
            case Releasing:
                LatencyMetric.RELEASE.endMeasurement(str);
                return;
            case Assigning:
                LatencyMetric.ASSIGN.endMeasurement(str);
                return;
            case Init:
                Preconditions.checkArgument(serviceUnitStateData == null, "Init state must be associated with null data");
                complete(str, th);
                return;
            default:
                return;
        }
    }

    public void close() {
        this.inFlightUnloadRequest.forEach((str, completableFuture) -> {
            if (completableFuture.isDone()) {
                return;
            }
            String format = String.format("Unloading bundle: %s, but the unload manager already closed.", str);
            log.warn(format);
            completableFuture.completeExceptionally(new IllegalStateException(format));
        });
        this.inFlightUnloadRequest.clear();
    }
}
