package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.class */
public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {
    private final PulsarService pulsarService;
    private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

    @VisibleForTesting
    final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap();
    private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap();
    private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches = new ConcurrentHashMap();

    @VisibleForTesting
    final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap();
    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$events$ActionType = new int[ActionType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$events$ActionType[ActionType.NONE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
        this.pulsarService = pulsarService;
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
        return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies topicPolicies) {
        return sendTopicPolicyEvent(topicName, ActionType.UPDATE, topicPolicies);
    }

    private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies topicPolicies) {
        createSystemTopicFactoryIfNeeded();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()).newWriterAsync().whenComplete((writer, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                writer.writeAsync(PulsarEvent.builder().actionType(actionType).eventType(EventType.TOPIC_POLICY).topicPoliciesEvent(TopicPoliciesEvent.builder().domain(topicName.getDomain().toString()).tenant(topicName.getTenant()).namespace(topicName.getNamespaceObject().getLocalName()).topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).policies(topicPolicies).build()).build()).whenComplete((messageId, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else if (messageId != null) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(new RuntimeException("Got message id is null."));
                    }
                    writer.closeAsync().whenComplete((r6, th) -> {
                        if (th != null) {
                            log.error("[{}] Close writer error.", topicName, th);
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Close writer success.", topicName);
                        }
                    });
                });
            }
        });
        return completableFuture;
    }

    private void notifyListener(Message<PulsarEvent> message) {
        if (EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
            TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
            TopicName topicName = TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic());
            if (this.listeners.get(topicName) != null) {
                TopicPolicies policies = topicPoliciesEvent.getPolicies();
                Iterator<TopicPolicyListener<TopicPolicies>> it = this.listeners.get(topicName).iterator();
                while (it.hasNext()) {
                    it.next().onUpdate(policies);
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public TopicPolicies getTopicPolicies(TopicName topicName) throws BrokerServiceException.TopicPoliciesCacheNotInitException {
        if (!this.policyCacheInitMap.containsKey(topicName.getNamespaceObject()) || this.policyCacheInitMap.get(topicName.getNamespaceObject()).booleanValue()) {
            return this.policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
        }
        throw new BrokerServiceException.TopicPoliciesCacheNotInitException();
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
        CompletableFuture<TopicPolicies> completableFuture = new CompletableFuture<>();
        createSystemTopicFactoryIfNeeded();
        if (this.namespaceEventsSystemTopicFactory == null) {
            completableFuture.complete(null);
            return completableFuture;
        }
        this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync().thenAccept(reader -> {
            fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, null, completableFuture);
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        if (NamespaceService.checkHeartbeatNamespace(namespaceObject) != null) {
            completableFuture.complete(null);
            return completableFuture;
        }
        createSystemTopicFactoryIfNeeded();
        synchronized (this) {
            if (this.readerCaches.get(namespaceObject) != null) {
                this.ownedBundlesCountPerNamespace.get(namespaceObject).incrementAndGet();
                completableFuture.complete(null);
            } else {
                TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient = this.namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(namespaceObject);
                this.ownedBundlesCountPerNamespace.putIfAbsent(namespaceObject, new AtomicInteger(1));
                this.policyCacheInitMap.put(namespaceObject, false);
                CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReaderAsync = createTopicPoliciesSystemTopicClient.newReaderAsync();
                this.readerCaches.put(namespaceObject, newReaderAsync);
                newReaderAsync.whenComplete((reader, th) -> {
                    if (th != null) {
                        log.error("[{}] Failed to create reader on __change_events topic", namespaceObject, th);
                        completableFuture.completeExceptionally(th);
                    } else {
                        initPolicesCache(reader, completableFuture);
                        completableFuture.thenRun(() -> {
                            readMorePolicies(reader);
                        });
                    }
                });
            }
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> remove;
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        if (NamespaceService.checkHeartbeatNamespace(namespaceObject) != null) {
            return CompletableFuture.completedFuture(null);
        }
        AtomicInteger atomicInteger = this.ownedBundlesCountPerNamespace.get(namespaceObject);
        if ((atomicInteger == null || atomicInteger.decrementAndGet() <= 0) && (remove = this.readerCaches.remove(namespaceObject)) != null) {
            remove.thenAccept((v0) -> {
                v0.closeAsync();
            });
            this.ownedBundlesCountPerNamespace.remove(namespaceObject);
            this.policyCacheInitMap.remove(namespaceObject);
            this.policiesCache.entrySet().removeIf(entry -> {
                return ((TopicName) entry.getKey()).getNamespaceObject().equals(namespaceObject);
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { // from class: org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.1
            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void onLoad(NamespaceBundle namespaceBundle) {
                SystemTopicBasedTopicPoliciesService.this.addOwnedNamespaceBundleAsync(namespaceBundle);
            }

            @Override // org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener
            public void unLoad(NamespaceBundle namespaceBundle) {
                SystemTopicBasedTopicPoliciesService.this.removeOwnedNamespaceBundleAsync(namespaceBundle);
            }

            @Override // java.util.function.Predicate
            public boolean test(NamespaceBundle namespaceBundle) {
                return true;
            }
        });
    }

    private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> completableFuture) {
        reader.hasMoreEventsAsync().whenComplete((bool, th) -> {
            if (th != null) {
                log.error("[{}] Failed to check the move events for the system topic", reader.getSystemTopic().getTopicName(), th);
                completableFuture.completeExceptionally(th);
                this.readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
            }
            if (bool.booleanValue()) {
                reader.readNextAsync().whenComplete((message, th) -> {
                    if (th != null) {
                        log.error("[{}] Failed to read event from the system topic.", reader.getSystemTopic().getTopicName(), th);
                        completableFuture.completeExceptionally(th);
                        this.readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                    }
                    refreshTopicPoliciesCache(message);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject());
                    }
                    initPolicesCache(reader, completableFuture);
                });
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
            }
            this.policyCacheInitMap.computeIfPresent(reader.getSystemTopic().getTopicName().getNamespaceObject(), (namespaceName, bool) -> {
                return true;
            });
            this.policiesCache.forEach((topicName, topicPolicies) -> {
                if (this.listeners.get(topicName) != null) {
                    Iterator<TopicPolicyListener<TopicPolicies>> it = this.listeners.get(topicName).iterator();
                    while (it.hasNext()) {
                        it.next().onUpdate(topicPolicies);
                    }
                }
            });
            completableFuture.complete(null);
        });
    }

    private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
        reader.readNextAsync().whenComplete((message, th) -> {
            if (th == null) {
                refreshTopicPoliciesCache(message);
                notifyListener(message);
                readMorePolicies(reader);
            } else {
                if (!(th instanceof PulsarClientException.AlreadyClosedException)) {
                    readMorePolicies(reader);
                    return;
                }
                log.error("Read more topic policies exception, close the read now!", th);
                NamespaceName namespaceObject = reader.getSystemTopic().getTopicName().getNamespaceObject();
                this.ownedBundlesCountPerNamespace.remove(namespaceObject);
                this.readerCaches.remove(namespaceObject);
            }
        });
    }

    private void refreshTopicPoliciesCache(Message<PulsarEvent> message) {
        if (EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
            TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
            TopicName topicName = TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic());
            switch (AnonymousClass2.$SwitchMap$org$apache$pulsar$common$events$ActionType[((PulsarEvent) message.getValue()).getActionType().ordinal()]) {
                case 1:
                    if (this.policiesCache.putIfAbsent(topicName, topicPoliciesEvent.getPolicies()) != null) {
                        log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);
                        return;
                    }
                    return;
                case 2:
                    this.policiesCache.put(topicName, topicPoliciesEvent.getPolicies());
                    return;
                case 3:
                    this.policiesCache.remove(topicName);
                    return;
                case 4:
                    return;
                default:
                    log.warn("Unknown event action type: {}", ((PulsarEvent) message.getValue()).getActionType());
                    return;
            }
        }
    }

    private void createSystemTopicFactoryIfNeeded() {
        if (this.namespaceEventsSystemTopicFactory == null) {
            synchronized (this) {
                if (this.namespaceEventsSystemTopicFactory == null) {
                    try {
                        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(this.pulsarService.getClient());
                    } catch (PulsarServerException e) {
                        log.error("Create namespace event system topic factory error.", e);
                    }
                }
            }
        }
    }

    private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> reader, TopicName topicName, TopicPolicies topicPolicies, CompletableFuture<TopicPolicies> completableFuture) {
        reader.hasMoreEventsAsync().whenComplete((bool, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            }
            if (bool.booleanValue()) {
                reader.readNextAsync().whenComplete((message, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    }
                    if (EventType.TOPIC_POLICY.equals(((PulsarEvent) message.getValue()).getEventType())) {
                        TopicPoliciesEvent topicPoliciesEvent = ((PulsarEvent) message.getValue()).getTopicPoliciesEvent();
                        if (topicName.equals(TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic()))) {
                            fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPoliciesEvent.getPolicies(), completableFuture);
                        } else {
                            fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, topicPolicies, completableFuture);
                        }
                    }
                });
            } else {
                completableFuture.complete(topicPolicies);
                reader.closeAsync().whenComplete((r6, th2) -> {
                    if (th2 != null) {
                        log.error("[{}] Close reader error.", topicName, th2);
                    }
                });
            }
        });
    }

    @VisibleForTesting
    long getPoliciesCacheSize() {
        return this.policiesCache.size();
    }

    @VisibleForTesting
    long getReaderCacheCount() {
        return this.readerCaches.size();
    }

    @VisibleForTesting
    boolean checkReaderIsCached(NamespaceName namespaceName) {
        return this.readerCaches.get(namespaceName) != null;
    }

    @VisibleForTesting
    Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
        return this.policyCacheInitMap.get(namespaceName);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> topicPolicyListener) {
        this.listeners.computeIfAbsent(topicName, topicName2 -> {
            return Lists.newCopyOnWriteArrayList();
        }).add(topicPolicyListener);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> topicPolicyListener) {
        this.listeners.computeIfAbsent(topicName, topicName2 -> {
            return Lists.newCopyOnWriteArrayList();
        }).remove(topicPolicyListener);
    }

    @Override // org.apache.pulsar.broker.service.TopicPoliciesService
    public void clean(TopicName topicName) {
        TopicName topicName2 = topicName;
        if (topicName.isPartitioned()) {
            topicName2 = TopicName.get(topicName.getPartitionedTopicName());
        }
        this.listeners.remove(topicName2);
    }

    @VisibleForTesting
    protected Map<TopicName, TopicPolicies> getPoliciesCache() {
        return this.policiesCache;
    }

    @VisibleForTesting
    protected Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> getListeners() {
        return this.listeners;
    }
}
