package org.apache.pulsar.broker.systopic;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Optional;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.class */
public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(NamespaceEventsSystemTopicServiceTest.class);
    private static final String NAMESPACE1 = "system-topic/namespace-1";
    private static final String NAMESPACE2 = "system-topic/namespace-2";
    private static final String NAMESPACE3 = "system-topic/namespace-3";
    private NamespaceEventsSystemTopicFactory systemTopicFactory;

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        prepareData();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSchemaCompatibility() throws Exception {
        String topicName = this.systemTopicFactory.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)).getTopicName().toString();
        Reader create = this.pulsarClient.newReader(Schema.BYTES).topic(topicName).startMessageId(MessageId.earliest).create();
        try {
            Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(topicName, false).join()).get()).getSchemaCompatibilityStrategy());
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testSendAndReceiveNamespaceEvents() throws Exception {
        TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient = this.systemTopicFactory.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
        PulsarEvent build = PulsarEvent.builder().eventType(EventType.TOPIC_POLICY).actionType(ActionType.INSERT).topicPoliciesEvent(TopicPoliciesEvent.builder().domain("persistent").tenant("system-topic").namespace(NamespaceName.get(NAMESPACE1).getLocalName()).topic("my-topic").policies(TopicPolicies.builder().maxProducerPerTopic(10).build()).build()).build();
        createTopicPoliciesSystemTopicClient.newWriter().write(build);
        SystemTopicClient.Reader newReader = createTopicPoliciesSystemTopicClient.newReader();
        Message readNext = newReader.readNext();
        log.info("Receive pulsar event from system topic : {}", readNext.getValue());
        Assert.assertEquals(readNext.getValue(), build);
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getWriters().size(), 1);
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getReaders().size(), 1);
        Message readNext2 = createTopicPoliciesSystemTopicClient.newReader().readNext();
        log.info("Receive pulsar event from system topic : {}", readNext2.getValue());
        Assert.assertEquals(readNext2.getValue(), build);
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getReaders().size(), 2);
        SystemTopicClient.Writer newWriter = createTopicPoliciesSystemTopicClient.newWriter();
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getWriters().size(), 2);
        newWriter.close();
        newReader.close();
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getWriters().size(), 1);
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getReaders().size(), 1);
        createTopicPoliciesSystemTopicClient.close();
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getWriters().size(), 0);
        Assert.assertEquals(createTopicPoliciesSystemTopicClient.getReaders().size(), 0);
    }

    @Test(timeOut = 30000)
    public void checkSystemTopic() throws PulsarAdminException {
        this.admin.topics().createPartitionedTopic("persistent://system-topic/namespace-1/normal_topic", 3);
        TopicName topicName = TopicName.get("persistent://system-topic/namespace-1/__change_events");
        TopicName topicName2 = TopicName.get("persistent://system-topic/namespace-1/normal_topic");
        BrokerService brokerService = this.pulsar.getBrokerService();
        Assert.assertEquals(brokerService.isSystemTopic(topicName), true);
        Assert.assertEquals(brokerService.isSystemTopic(topicName2), false);
    }

    private void prepareData() throws PulsarAdminException {
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("system-topic", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace(NAMESPACE1);
        this.admin.namespaces().createNamespace(NAMESPACE2);
        this.admin.namespaces().createNamespace(NAMESPACE3);
        this.systemTopicFactory = new NamespaceEventsSystemTopicFactory(this.pulsarClient);
    }
}
