package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/BacklogQuotaManager.class */
public class BacklogQuotaManager {
    private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManager.class);
    private final BacklogQuota defaultQuota;
    private final ZooKeeperDataCache<Policies> zkCache;
    private final PulsarService pulsar;
    private final boolean isTopicLevelPoliciesEnable;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.BacklogQuotaManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/BacklogQuotaManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$BacklogQuotaType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$RetentionPolicy = new int[BacklogQuota.RetentionPolicy.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$RetentionPolicy[BacklogQuota.RetentionPolicy.consumer_backlog_eviction.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$RetentionPolicy[BacklogQuota.RetentionPolicy.producer_exception.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$RetentionPolicy[BacklogQuota.RetentionPolicy.producer_request_hold.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$BacklogQuotaType = new int[BacklogQuota.BacklogQuotaType.values().length];
            try {
                $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$BacklogQuotaType[BacklogQuota.BacklogQuotaType.destination_storage.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$BacklogQuotaType[BacklogQuota.BacklogQuotaType.message_age.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public BacklogQuotaManager(PulsarService pulsarService) {
        this.isTopicLevelPoliciesEnable = pulsarService.getConfiguration().isTopicLevelPoliciesEnabled();
        this.defaultQuota = new BacklogQuota(pulsarService.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024, pulsarService.getConfiguration().getBacklogQuotaDefaultLimitSecond(), pulsarService.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
        this.zkCache = pulsarService.getConfigurationCache().policiesCache();
        this.pulsar = pulsarService;
    }

    public BacklogQuota getDefaultQuota() {
        return this.defaultQuota;
    }

    public BacklogQuota getBacklogQuota(String str, String str2) {
        try {
            return (BacklogQuota) this.zkCache.get(str2).map(policies -> {
                return (BacklogQuota) policies.backlog_quota_map.getOrDefault(BacklogQuota.BacklogQuotaType.destination_storage, this.defaultQuota);
            }).orElse(this.defaultQuota);
        } catch (Exception e) {
            log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", str, e);
            return this.defaultQuota;
        }
    }

    public BacklogQuota getBacklogQuota(TopicName topicName) {
        String path = AdminResource.path(ZkAdminPaths.POLICIES, topicName.getNamespace());
        if (!this.isTopicLevelPoliciesEnable) {
            return getBacklogQuota(topicName.getNamespace(), path);
        }
        try {
            return (BacklogQuota) Optional.ofNullable(this.pulsar.getTopicPoliciesService().getTopicPolicies(topicName)).map((v0) -> {
                return v0.getBackLogQuotaMap();
            }).map(map -> {
                return (BacklogQuota) map.get(BacklogQuota.BacklogQuotaType.destination_storage.name());
            }).orElseGet(() -> {
                return getBacklogQuota(topicName.getNamespace(), path);
            });
        } catch (Exception e) {
            log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e);
            return getBacklogQuota(topicName.getNamespace(), path);
        }
    }

    public long getBacklogQuotaLimitInSize(TopicName topicName) {
        return getBacklogQuota(topicName).getLimitSize();
    }

    public int getBacklogQuotaLimitInTime(TopicName topicName) {
        return getBacklogQuota(topicName).getLimitTime();
    }

    public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuota.BacklogQuotaType backlogQuotaType, boolean z) {
        BacklogQuota backlogQuota = getBacklogQuota(TopicName.get(persistentTopic.getName()));
        log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", new Object[]{backlogQuotaType, persistentTopic.getName(), backlogQuota.getPolicy()});
        switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$RetentionPolicy[backlogQuota.getPolicy().ordinal()]) {
            case 1:
                switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$policies$data$BacklogQuota$BacklogQuotaType[backlogQuotaType.ordinal()]) {
                    case 1:
                        dropBacklogForSizeLimit(persistentTopic, backlogQuota);
                        return;
                    case 2:
                        dropBacklogForTimeLimit(persistentTopic, backlogQuota, z);
                        return;
                    default:
                        return;
                }
            case 2:
            case 3:
                disconnectProducers(persistentTopic);
                return;
            default:
                return;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:22:0x0172 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:26:0x005a A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void dropBacklogForSizeLimit(org.apache.pulsar.broker.service.persistent.PersistentTopic r11, org.apache.pulsar.common.policies.data.BacklogQuota r12) {
        /*
            Method dump skipped, instructions count: 411
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pulsar.broker.service.BacklogQuotaManager.dropBacklogForSizeLimit(org.apache.pulsar.broker.service.persistent.PersistentTopic, org.apache.pulsar.common.policies.data.BacklogQuota):void");
    }

    private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuota backlogQuota, boolean z) {
        if (z) {
            int limitTime = (int) (0.9d * backlogQuota.getLimitTime());
            if (log.isDebugEnabled()) {
                log.debug("[{}] target backlog expire time is [{}]", persistentTopic.getName(), Integer.valueOf(limitTime));
            }
            persistentTopic.getSubscriptions().forEach((str, persistentSubscription) -> {
                persistentSubscription.getExpiryMonitor().expireMessages(limitTime);
            });
            return;
        }
        Long valueOf = Long.valueOf(persistentTopic.getManagedLedger().getClock().millis());
        ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
        try {
            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo) managedLedger.getLedgerInfo(Long.valueOf(managedLedger.getCursors().getSlowestReaderPosition().getLedgerId()).longValue()).get(); ledgerInfo.getTimestamp() > 0 && valueOf.longValue() - ledgerInfo.getTimestamp() > backlogQuota.getLimitTime(); ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo) managedLedger.getLedgerInfo(Long.valueOf(managedLedger.getCursors().getSlowestReaderPosition().getLedgerId()).longValue()).get()) {
                managedLedger.getSlowestConsumer().resetCursor(managedLedger.getNextValidPosition(PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)));
            }
        } catch (Exception e) {
            log.error("Error resetting cursor for slowest consumer [{}]: {}", managedLedger.getSlowestConsumer().getName(), e);
        }
    }

    private void disconnectProducers(PersistentTopic persistentTopic) {
        ArrayList newArrayList = Lists.newArrayList();
        persistentTopic.getProducers().values().forEach(producer -> {
            log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer", producer.getProducerName(), persistentTopic.getName());
            newArrayList.add(producer.disconnect());
        });
        FutureUtil.waitForAll(newArrayList).thenRun(() -> {
            log.info("All producers on topic [{}] are disconnected", persistentTopic.getName());
        }).exceptionally(th -> {
            log.error("Error in disconnecting producers on topic [{}] [{}]", persistentTopic.getName(), th);
            return null;
        });
    }
}
