package io.strimzi.kafka.quotas;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.strimzi.kafka.quotas.throttle.AvailableBytesThrottleFactorPolicy;
import io.strimzi.kafka.quotas.throttle.AvailableRatioThrottleFactorPolicy;
import io.strimzi.kafka.quotas.throttle.FixedDurationExpiryPolicy;
import io.strimzi.kafka.quotas.throttle.PolicyBasedThrottle;
import io.strimzi.kafka.quotas.throttle.ThrottleFactorPolicy;
import io.strimzi.kafka.quotas.throttle.ThrottleFactorSource;
import io.strimzi.kafka.quotas.throttle.UnlimitedThrottleFactorSource;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/quotas/StaticQuotaCallback.class */
public class StaticQuotaCallback implements ClientQuotaCallback {
    public static final String REMOTE_BROKER_TAG = "remoteBrokerId";
    public static final String LOG_DIR_TAG = "logDir";
    private static final Logger log = LoggerFactory.getLogger(StaticQuotaCallback.class);
    private static final String EXCLUDED_PRINCIPAL_QUOTA_KEY = "excluded-principal-quota-key";
    private final Clock clock;
    private volatile Map<ClientQuotaType, Quota> quotaMap;
    private volatile Set<String> excludedPrincipalNameList;
    private final Set<ClientQuotaType> resetQuota;
    private volatile ThrottleFactorSource throttleFactorSource;
    private final VolumeSourceBuilder volumeSourceBuilder;
    private static final String SCOPE = "io.strimzi.kafka.quotas.StaticQuotaCallback";
    private final ScheduledExecutorService backgroundScheduler;
    public static final String HOST_BROKER_TAG = "observingBrokerId";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/strimzi/kafka/quotas/StaticQuotaCallback$ClientQuotaGauge.class */
    public static class ClientQuotaGauge extends Gauge<Double> {
        private final Quota quota;

        public ClientQuotaGauge(Quota quota) {
            this.quota = quota;
        }

        /* renamed from: value, reason: merged with bridge method [inline-methods] */
        public Double m2value() {
            return Double.valueOf(this.quota.bound());
        }
    }

    public StaticQuotaCallback() {
        this(new VolumeSourceBuilder(), Executors.newScheduledThreadPool(2, runnable -> {
            Thread thread = new Thread(runnable, StaticQuotaCallback.class.getSimpleName() + "-taskExecutor");
            thread.setDaemon(true);
            return thread;
        }), Clock.systemUTC());
    }

    StaticQuotaCallback(VolumeSourceBuilder volumeSourceBuilder, ScheduledExecutorService scheduledExecutorService, Clock clock) {
        this.quotaMap = new HashMap();
        this.excludedPrincipalNameList = Set.of();
        this.resetQuota = Collections.newSetFromMap(new ConcurrentHashMap());
        this.throttleFactorSource = UnlimitedThrottleFactorSource.UNLIMITED_THROTTLE_FACTOR_SOURCE;
        this.volumeSourceBuilder = volumeSourceBuilder;
        this.backgroundScheduler = scheduledExecutorService;
        Collections.addAll(this.resetQuota, ClientQuotaType.values());
        this.clock = clock;
    }

    public Map<String, String> quotaMetricTags(ClientQuotaType clientQuotaType, KafkaPrincipal kafkaPrincipal, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("quota.type", clientQuotaType.name());
        if (!this.excludedPrincipalNameList.isEmpty() && kafkaPrincipal != null && this.excludedPrincipalNameList.contains(kafkaPrincipal.getName())) {
            hashMap.put(EXCLUDED_PRINCIPAL_QUOTA_KEY, Boolean.TRUE.toString());
        }
        return hashMap;
    }

    public Double quotaLimit(ClientQuotaType clientQuotaType, Map<String, String> map) {
        if (Boolean.TRUE.toString().equals(map.get(EXCLUDED_PRINCIPAL_QUOTA_KEY))) {
            return Double.valueOf(Quota.upperBound(Double.MAX_VALUE).bound());
        }
        double bound = this.quotaMap.getOrDefault(clientQuotaType, Quota.upperBound(Double.MAX_VALUE)).bound();
        if (ClientQuotaType.PRODUCE.equals(clientQuotaType)) {
            bound *= this.throttleFactorSource.currentThrottleFactor().getThrottleFactor();
        }
        return Double.valueOf(Math.max(bound, 1.0d));
    }

    public void updateQuota(ClientQuotaType clientQuotaType, ClientQuotaEntity clientQuotaEntity, double d) {
    }

    public void removeQuota(ClientQuotaType clientQuotaType, ClientQuotaEntity clientQuotaEntity) {
    }

    public boolean quotaResetRequired(ClientQuotaType clientQuotaType) {
        return this.resetQuota.remove(clientQuotaType);
    }

    public boolean updateClusterMetadata(Cluster cluster) {
        return false;
    }

    public void close() {
        try {
            closeExecutorService();
            this.volumeSourceBuilder.close();
            Stream filter = Metrics.defaultRegistry().allMetrics().keySet().stream().filter(metricName -> {
                return SCOPE.equals(metricName.getScope());
            });
            MetricsRegistry defaultRegistry = Metrics.defaultRegistry();
            Objects.requireNonNull(defaultRegistry);
            filter.forEach(defaultRegistry::removeMetric);
        } catch (Throwable th) {
            Stream filter2 = Metrics.defaultRegistry().allMetrics().keySet().stream().filter(metricName2 -> {
                return SCOPE.equals(metricName2.getScope());
            });
            MetricsRegistry defaultRegistry2 = Metrics.defaultRegistry();
            Objects.requireNonNull(defaultRegistry2);
            filter2.forEach(defaultRegistry2::removeMetric);
            throw th;
        }
    }

    public void configure(Map<String, ?> map) {
        StaticQuotaConfig staticQuotaConfig = new StaticQuotaConfig(map, true);
        this.quotaMap = staticQuotaConfig.getQuotaMap();
        long storageCheckInterval = staticQuotaConfig.getStorageCheckInterval();
        if (storageCheckInterval > 0) {
            Optional<Long> availableBytesLimit = staticQuotaConfig.getAvailableBytesLimit();
            Optional<Double> availableRatioLimit = staticQuotaConfig.getAvailableRatioLimit();
            if (availableBytesLimit.isPresent() || availableRatioLimit.isPresent()) {
                scheduleStorageCheck(staticQuotaConfig, availableBytesLimit, availableRatioLimit, storageCheckInterval);
            } else {
                log.info("No volume size limits configured, storage check disabled");
            }
        } else {
            log.info("Static quota callback configured to never check usage: set {} to a positive value to enable", "client.quota.callback.static.storage.check-interval");
        }
        this.excludedPrincipalNameList = staticQuotaConfig.getSetOfExcludedPrincipals();
        if (!this.excludedPrincipalNameList.isEmpty()) {
            log.info("Excluded principals {}", this.excludedPrincipalNameList);
        }
        this.quotaMap.forEach((clientQuotaType, quota) -> {
            Metrics.newGauge(metricName(StaticQuotaCallback.class, clientQuotaType.name().toUpperCase(Locale.ENGLISH).charAt(0) + clientQuotaType.name().toLowerCase(Locale.ENGLISH).substring(1)), new ClientQuotaGauge(quota));
        });
    }

    private void scheduleStorageCheck(StaticQuotaConfig staticQuotaConfig, Optional<Long> optional, Optional<Double> optional2, long j) {
        ThrottleFactorPolicy availableRatioThrottleFactorPolicy;
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
        linkedHashMap.put(HOST_BROKER_TAG, staticQuotaConfig.getBrokerId());
        if (optional.isPresent() && optional2.isPresent()) {
            throw new IllegalStateException("Both limit types configured, only configure one of [" + String.join(",", "client.quota.callback.static.storage.per.volume.limit.min.available.bytes", "client.quota.callback.static.storage.per.volume.limit.min.available.ratio") + "]");
        }
        if (optional.isPresent()) {
            availableRatioThrottleFactorPolicy = new AvailableBytesThrottleFactorPolicy(optional.get().longValue());
            log.info("Available bytes limit {}", optional.get());
        } else {
            availableRatioThrottleFactorPolicy = new AvailableRatioThrottleFactorPolicy(optional2.get().doubleValue());
            log.info("Available ratio limit {}", optional2.get());
        }
        PolicyBasedThrottle policyBasedThrottle = new PolicyBasedThrottle(availableRatioThrottleFactorPolicy, () -> {
            this.resetQuota.add(ClientQuotaType.PRODUCE);
        }, new FixedDurationExpiryPolicy(this.clock, staticQuotaConfig.getThrottleFactorValidityDuration()), staticQuotaConfig.getFallbackThrottleFactor(), linkedHashMap);
        this.throttleFactorSource = policyBasedThrottle;
        this.backgroundScheduler.scheduleWithFixedDelay(this.volumeSourceBuilder.withConfig(staticQuotaConfig).withVolumeObserver(new CachingVolumeObserver(policyBasedThrottle, this.clock, staticQuotaConfig.getThrottleFactorValidityDuration(), linkedHashMap)).withDefaultTags(linkedHashMap).build(), 0L, j, TimeUnit.SECONDS);
        ScheduledExecutorService scheduledExecutorService = this.backgroundScheduler;
        Objects.requireNonNull(policyBasedThrottle);
        scheduledExecutorService.scheduleWithFixedDelay(policyBasedThrottle::checkThrottleFactorValidity, 0L, 10L, TimeUnit.SECONDS);
        log.info("Configured quota callback with {}. Storage check interval: {}s", this.quotaMap, Long.valueOf(j));
    }

    private void closeExecutorService() {
        try {
            this.backgroundScheduler.shutdownNow();
        } catch (Exception e) {
            log.warn("Encountered problem shutting down background executor: {}", e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MetricName metricName(Class<?> cls, String str, LinkedHashMap<String, String> linkedHashMap) {
        return metricName(str, cls.getSimpleName(), cls.getPackageName(), linkedHashMap);
    }

    static MetricName metricName(Class<?> cls, String str) {
        return metricName(str, cls.getSimpleName(), cls.getPackageName());
    }

    public static MetricName metricName(String str, String str2, String str3) {
        String sanitise = sanitise(str3);
        String sanitise2 = sanitise(str2);
        String sanitise3 = sanitise(str);
        return new MetricName(sanitise, sanitise2, sanitise3, SCOPE, String.format("%s:type=%s,name=%s", sanitise, sanitise2, sanitise3));
    }

    public static MetricName metricName(String str, String str2, String str3, LinkedHashMap<String, String> linkedHashMap) {
        String str4 = (String) linkedHashMap.entrySet().stream().map(entry -> {
            return String.format("%s=%s", sanitise((String) entry.getKey()), sanitise((String) entry.getValue()));
        }).collect(Collectors.joining(","));
        String sanitise = sanitise(str3);
        String sanitise2 = sanitise(str2);
        String sanitise3 = sanitise(str);
        return new MetricName(sanitise, sanitise2, sanitise3, SCOPE, !str4.isBlank() ? String.format("%s:type=%s,name=%s,%s", sanitise, sanitise2, sanitise3, str4) : String.format("%s:type=%s,name=%s", sanitise, sanitise2, sanitise3));
    }

    private static String sanitise(String str) {
        return str.replaceAll("[:?*=,]", "").replaceAll("//", "").replaceAll("\\$$", "");
    }
}
