/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DispatchRateLimiter {
    private final PersistentTopic topic;
    private final String topicName;
    private final Type type;
    private final BrokerService brokerService;
    private RateLimiter dispatchRateLimiterOnMessage;
    private RateLimiter dispatchRateLimiterOnByte;
    private static final Logger log = LoggerFactory.getLogger(DispatchRateLimiter.class);

    public DispatchRateLimiter(PersistentTopic topic, Type type) {
        this.topic = topic;
        this.topicName = topic.getName();
        this.brokerService = topic.getBrokerService();
        this.type = type;
        this.updateDispatchRate();
    }

    public long getAvailableDispatchRateLimitOnMsg() {
        return this.dispatchRateLimiterOnMessage == null ? -1L : this.dispatchRateLimiterOnMessage.getAvailablePermits();
    }

    public long getAvailableDispatchRateLimitOnByte() {
        return this.dispatchRateLimiterOnByte == null ? -1L : this.dispatchRateLimiterOnByte.getAvailablePermits();
    }

    public boolean tryDispatchPermit(long msgPermits, long bytePermits) {
        boolean acquiredMsgPermit = msgPermits <= 0L || this.dispatchRateLimiterOnMessage == null || this.dispatchRateLimiterOnMessage.tryAcquire(msgPermits);
        boolean acquiredBytePermit = bytePermits <= 0L || this.dispatchRateLimiterOnByte == null || this.dispatchRateLimiterOnByte.tryAcquire(bytePermits);
        return acquiredMsgPermit && acquiredBytePermit;
    }

    public boolean hasMessageDispatchPermit() {
        return !(this.dispatchRateLimiterOnMessage != null && this.dispatchRateLimiterOnMessage.getAvailablePermits() <= 0L || this.dispatchRateLimiterOnByte != null && this.dispatchRateLimiterOnByte.getAvailablePermits() <= 0L);
    }

    public boolean isDispatchRateLimitingEnabled() {
        return this.dispatchRateLimiterOnMessage != null || this.dispatchRateLimiterOnByte != null;
    }

    private DispatchRate createDispatchRate() {
        long dispatchThrottlingRateInByte;
        int dispatchThrottlingRateInMsg;
        ServiceConfiguration config = this.brokerService.pulsar().getConfiguration();
        switch (this.type) {
            case TOPIC: {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg();
                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerTopicInByte();
                break;
            }
            case SUBSCRIPTION: {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg();
                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerSubscriptionInByte();
                break;
            }
            case REPLICATOR: {
                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
                dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte();
                break;
            }
            default: {
                dispatchThrottlingRateInMsg = -1;
                dispatchThrottlingRateInByte = -1L;
            }
        }
        return DispatchRate.builder().dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg).dispatchThrottlingRateInByte(dispatchThrottlingRateInByte).ratePeriodInSecond(1).relativeToPublishRate(config.isDispatchThrottlingRateRelativeToPublishRate()).build();
    }

    public void updateDispatchRate() {
        Optional<DispatchRate> dispatchRate = DispatchRateLimiter.getTopicPolicyDispatchRate(this.brokerService, this.topicName, this.type);
        if (!dispatchRate.isPresent()) {
            ((CompletableFuture)this.getPoliciesDispatchRateAsync(this.brokerService).thenAccept(dispatchRateOp -> {
                if (!dispatchRateOp.isPresent()) {
                    dispatchRateOp = Optional.of(this.createDispatchRate());
                }
                this.updateDispatchRate((DispatchRate)dispatchRateOp.get());
                log.info("[{}] configured {} message-dispatch rate at broker {}", new Object[]{this.topicName, this.type, dispatchRateOp.get()});
            })).exceptionally(ex -> {
                log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}", new Object[]{this.topicName, this.type, ex});
                return null;
            });
        } else {
            this.updateDispatchRate(dispatchRate.get());
            log.info("[{}] configured {} message-dispatch rate at broker {}", new Object[]{this.topicName, this.type, dispatchRate.get()});
        }
    }

    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies, String topicName, Type type) {
        ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
        Optional<DispatchRate> dispatchRate = DispatchRateLimiter.getTopicPolicyDispatchRate(brokerService, topicName, type);
        if (dispatchRate.isPresent()) {
            return true;
        }
        policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
        return DispatchRateLimiter.isDispatchRateNeeded(serviceConfig, policies, topicName, type);
    }

    public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService, String topicName, Type type) {
        Optional<DispatchRate> dispatchRate = Optional.empty();
        ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
        if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
            try {
                switch (type) {
                    case TOPIC: {
                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topicName))).map(TopicPolicies::getDispatchRate);
                        break;
                    }
                    case SUBSCRIPTION: {
                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topicName))).map(TopicPolicies::getSubscriptionDispatchRate);
                        break;
                    }
                    case REPLICATOR: {
                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get((String)topicName))).map(TopicPolicies::getReplicatorDispatchRate);
                        break;
                    }
                }
            }
            catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                log.debug("Topic {} policies have not been initialized yet.", (Object)topicName);
            }
            catch (Exception e) {
                log.debug("[{}] Failed to get topic dispatch rate. ", (Object)topicName, (Object)e);
            }
        }
        return dispatchRate;
    }

    public static boolean isDispatchRateNeeded(ServiceConfiguration serviceConfig, Optional<Policies> policies, String topicName, Type type) {
        DispatchRateImpl dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
        if (dispatchRate == null) {
            switch (type) {
                case TOPIC: {
                    return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0L;
                }
                case SUBSCRIPTION: {
                    return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0L;
                }
                case REPLICATOR: {
                    return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0 || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0L;
                }
            }
            log.error("error DispatchRateLimiter type: {} ", (Object)type);
            return false;
        }
        return true;
    }

    public void onPoliciesUpdate(Policies data) {
        DispatchRate dispatchRate;
        String cluster = this.brokerService.pulsar().getConfiguration().getClusterName();
        switch (this.type) {
            case TOPIC: {
                dispatchRate = (DispatchRate)data.topicDispatchRate.get(cluster);
                if (dispatchRate != null) break;
                dispatchRate = (DispatchRate)data.clusterDispatchRate.get(cluster);
                break;
            }
            case SUBSCRIPTION: {
                dispatchRate = (DispatchRate)data.subscriptionDispatchRate.get(cluster);
                break;
            }
            case REPLICATOR: {
                dispatchRate = (DispatchRate)data.replicatorDispatchRate.get(cluster);
                break;
            }
            default: {
                log.error("error DispatchRateLimiter type: {} ", (Object)this.type);
                dispatchRate = null;
            }
        }
        if (dispatchRate != null) {
            DispatchRate newDispatchRate = this.createDispatchRate();
            if (!DispatchRateLimiter.isDispatchRateEnabled(dispatchRate) && DispatchRateLimiter.isDispatchRateEnabled(newDispatchRate)) {
                dispatchRate = newDispatchRate;
            }
            this.updateDispatchRate(dispatchRate);
        }
    }

    public static DispatchRateImpl getPoliciesDispatchRate(String cluster, Optional<Policies> policies, Type type) {
        return policies.map(p -> {
            DispatchRateImpl dispatchRate;
            switch (type) {
                case TOPIC: {
                    dispatchRate = (DispatchRateImpl)p.topicDispatchRate.get(cluster);
                    if (dispatchRate != null) break;
                    dispatchRate = (DispatchRateImpl)p.clusterDispatchRate.get(cluster);
                    break;
                }
                case SUBSCRIPTION: {
                    dispatchRate = (DispatchRateImpl)p.subscriptionDispatchRate.get(cluster);
                    break;
                }
                case REPLICATOR: {
                    dispatchRate = (DispatchRateImpl)p.replicatorDispatchRate.get(cluster);
                    break;
                }
                default: {
                    log.error("error DispatchRateLimiter type: {} ", (Object)type);
                    return null;
                }
            }
            return DispatchRateLimiter.isDispatchRateEnabled((DispatchRate)dispatchRate) ? dispatchRate : null;
        }).orElse(null);
    }

    public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) {
        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
        return DispatchRateLimiter.getPoliciesAsync(brokerService, this.topicName).thenApply(policiesOp -> Optional.ofNullable(DispatchRateLimiter.getPoliciesDispatchRate(cluster, policiesOp, this.type)));
    }

    public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService, String topicName) {
        NamespaceName namespace = TopicName.get((String)topicName).getNamespaceObject();
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
    }

    public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
        NamespaceName namespace = TopicName.get((String)topicName).getNamespaceObject();
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
    }

    public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
        Supplier<Long> permitUpdaterByte;
        Supplier<Long> permitUpdaterMsg;
        log.info("setting message-dispatch-rate {}", (Object)dispatchRate);
        long msgRate = dispatchRate.getDispatchThrottlingRateInMsg();
        long byteRate = dispatchRate.getDispatchThrottlingRateInByte();
        long ratePeriod = dispatchRate.getRatePeriodInSecond();
        Supplier<Long> supplier = permitUpdaterMsg = dispatchRate.isRelativeToPublishRate() ? () -> this.getRelativeDispatchRateInMsg(dispatchRate) : null;
        if (msgRate > 0L) {
            if (this.dispatchRateLimiterOnMessage == null) {
                this.dispatchRateLimiterOnMessage = RateLimiter.builder().scheduledExecutorService(this.brokerService.pulsar().getExecutor()).permits(msgRate).rateTime(ratePeriod).timeUnit(TimeUnit.SECONDS).permitUpdater(permitUpdaterMsg).isDispatchOrPrecisePublishRateLimiter(true).build();
            } else {
                this.dispatchRateLimiterOnMessage.setRate(msgRate, (long)dispatchRate.getRatePeriodInSecond(), TimeUnit.SECONDS, permitUpdaterMsg);
            }
        } else if (this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage.close();
            this.dispatchRateLimiterOnMessage = null;
        }
        Supplier<Long> supplier2 = permitUpdaterByte = dispatchRate.isRelativeToPublishRate() ? () -> this.getRelativeDispatchRateInByte(dispatchRate) : null;
        if (byteRate > 0L) {
            if (this.dispatchRateLimiterOnByte == null) {
                this.dispatchRateLimiterOnByte = RateLimiter.builder().scheduledExecutorService(this.brokerService.pulsar().getExecutor()).permits(byteRate).rateTime(ratePeriod).timeUnit(TimeUnit.SECONDS).permitUpdater(permitUpdaterByte).isDispatchOrPrecisePublishRateLimiter(true).build();
            } else {
                this.dispatchRateLimiterOnByte.setRate(byteRate, (long)dispatchRate.getRatePeriodInSecond(), TimeUnit.SECONDS, permitUpdaterByte);
            }
        } else if (this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte.close();
            this.dispatchRateLimiterOnByte = null;
        }
    }

    private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
        return this.topic != null && dispatchRate != null ? (long)this.topic.getLastUpdatedAvgPublishRateInMsg() + (long)dispatchRate.getDispatchThrottlingRateInMsg() : 0L;
    }

    private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
        return this.topic != null && dispatchRate != null ? (long)this.topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.getDispatchThrottlingRateInByte() : 0L;
    }

    public long getDispatchRateOnMsg() {
        return this.dispatchRateLimiterOnMessage != null ? this.dispatchRateLimiterOnMessage.getRate() : -1L;
    }

    public long getDispatchRateOnByte() {
        return this.dispatchRateLimiterOnByte != null ? this.dispatchRateLimiterOnByte.getRate() : -1L;
    }

    private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
        return dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0 || dispatchRate.getDispatchThrottlingRateInByte() > 0L);
    }

    public void close() {
        if (this.dispatchRateLimiterOnMessage != null) {
            this.dispatchRateLimiterOnMessage.close();
            this.dispatchRateLimiterOnMessage = null;
        }
        if (this.dispatchRateLimiterOnByte != null) {
            this.dispatchRateLimiterOnByte.close();
            this.dispatchRateLimiterOnByte = null;
        }
    }

    public static enum Type {
        TOPIC,
        SUBSCRIPTION,
        REPLICATOR;

    }
}

