package org.apache.pulsar.broker.admin;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-admin"})
/* loaded from: input_file:org/apache/pulsar/broker/admin/CreateSubscriptionTest.class */
public class CreateSubscriptionTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(CreateSubscriptionTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void createSubscriptionSingleTopic() throws Exception {
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-topic", "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription("persistent://my-property/my-ns/my-topic", "sub-1", MessageId.latest);
            Assert.fail("Should have failed");
        } catch (PulsarAdminException.ConflictException e) {
            Assert.assertEquals(e.getCause().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
        }
        Assert.assertEquals(this.admin.topics().getSubscriptions("persistent://my-property/my-ns/my-topic"), Lists.newArrayList(new String[]{"sub-1"}));
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic").create();
        create.send("test-1".getBytes());
        create.send("test-2".getBytes());
        MessageId send = create.send("test-3".getBytes());
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-1")).getMsgBacklog(), 3L);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-topic", "sub-2", MessageId.latest);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-2")).getMsgBacklog(), 0L);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-topic", "sub-3", MessageId.earliest);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-3")).getMsgBacklog(), 3L);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-topic", "sub-5", send);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-5")).getMsgBacklog(), 1L);
    }

    @Test
    public void createSubscriptionOnPartitionedTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-partitioned-topic", 10);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic", "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic", "sub-1", MessageId.latest);
            Assert.fail("Should have failed");
        } catch (Exception e) {
        }
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(this.admin.topics().getSubscriptions(TopicName.get("persistent://my-property/my-ns/my-partitioned-topic").getPartition(i).toString()), Lists.newArrayList(new String[]{"sub-1"}));
        }
    }

    @Test
    public void createSubscriptionOnPartitionedTopicWithPartialFailure() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/my-partitioned-topic", 10);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic" + "-partition-0", "sub-1", MessageId.latest);
        this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic", "sub-1", MessageId.latest);
        try {
            this.admin.topics().createSubscription("persistent://my-property/my-ns/my-partitioned-topic", "sub-1", MessageId.latest);
            Assert.fail("Should have failed");
        } catch (Exception e) {
        }
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(this.admin.topics().getSubscriptions(TopicName.get("persistent://my-property/my-ns/my-partitioned-topic").getPartition(i).toString()), Lists.newArrayList(new String[]{"sub-1"}));
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "subscriptionMode")
    public Object[][] subscriptionModeProvider() {
        return new Object[]{new Object[]{SubscriptionMode.Durable}, new Object[]{SubscriptionMode.NonDurable}};
    }

    @Test(dataProvider = "subscriptionMode")
    public void testSubscriptionPropertiesStats(SubscriptionMode subscriptionMode) throws Exception {
        String str = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        HashMap hashMap = new HashMap();
        hashMap.put("test-topic", "tag1");
        this.pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(new String[]{str}).receiverQueueSize(1).subscriptionProperties(hashMap).subscriptionName("my-sub").subscribe();
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub")).getSubscriptionProperties(), hashMap);
        String str2 = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        this.admin.topics().createPartitionedTopic(str2, 10);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("topic1", "tag1");
        hashMap2.put("topic2", "tag2");
        hashMap2.put("topic3", "tag3");
        this.pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(new String[]{str2}).receiverQueueSize(1).subscriptionProperties(hashMap2).subscriptionName("my-sub-1").subscribe();
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getPartitionedStats(str2, false).getSubscriptions().get("my-sub-1")).getSubscriptionProperties(), hashMap2);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getPartitionedStats(str2, true).getSubscriptions().get("my-sub-1")).getSubscriptionProperties(), hashMap2);
    }

    @Test(dataProvider = "subscriptionMode")
    public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) throws Exception {
        String str = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        HashMap hashMap = new HashMap();
        hashMap.put("1", "1");
        hashMap.put("2", "2");
        Consumer subscribe = this.pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(new String[]{str}).receiverQueueSize(1).subscriptionProperties(hashMap).subscriptionName("my-sub").subscribe();
        PersistentSubscription subscription = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub");
        Assert.assertEquals(subscription.getSubscriptionProperties(), hashMap);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        for (int i = 0; i < 10; i++) {
            create.send("msg".getBytes(StandardCharsets.UTF_8));
        }
        Message receive = subscribe.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull(receive);
        subscribe.acknowledge(receive);
        MessageIdImpl messageId = receive.getMessageId();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(subscription.getCursor().getMarkDeletedPosition().getLedgerId(), messageId.getLedgerId());
            Assert.assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId());
        });
        Assert.assertEquals(subscription.getSubscriptionProperties(), hashMap);
        subscribe.close();
        create.close();
        restartBroker();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionMode(subscriptionMode).receiverQueueSize(1).subscriptionProperties(hashMap).subscriptionName("my-sub").subscribe();
        PersistentSubscription subscription2 = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(subscription2.getSubscriptionProperties(), hashMap);
        });
        subscribe2.close();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("3", "3");
        hashMap2.put("4", "4");
        Consumer subscribe3 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionMode(subscriptionMode).receiverQueueSize(1).subscriptionProperties(hashMap2).subscriptionName("my-sub").subscribe();
        Assert.assertEquals(subscription.getSubscriptionProperties(), hashMap);
        subscribe3.close();
        restartBroker();
        Consumer subscribe4 = this.pulsarClient.newConsumer().subscriptionMode(subscriptionMode).topic(new String[]{str}).receiverQueueSize(1).subscriptionProperties(hashMap2).subscriptionName("my-sub").subscribe();
        Map subscriptionProperties = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub").getSubscriptionProperties();
        if (subscriptionMode == SubscriptionMode.Durable) {
            Assert.assertEquals(subscriptionProperties, hashMap);
        } else {
            Assert.assertEquals(subscriptionProperties, hashMap2);
        }
        subscribe4.close();
        Consumer subscribe5 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionMode(subscriptionMode).receiverQueueSize(1).subscriptionName("my-sub").subscribe();
        Map subscriptionProperties2 = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub").getSubscriptionProperties();
        if (subscriptionMode == SubscriptionMode.Durable) {
            Assert.assertEquals(subscriptionProperties2, hashMap);
        } else {
            Assert.assertTrue(subscriptionProperties2.isEmpty());
        }
        subscribe5.close();
        restartBroker();
        Consumer subscribe6 = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionMode(subscriptionMode).receiverQueueSize(1).subscriptionName("my-sub").subscribe();
        Map subscriptionProperties3 = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub").getSubscriptionProperties();
        if (subscriptionMode == SubscriptionMode.Durable) {
            Assert.assertEquals(subscriptionProperties3, hashMap);
        } else {
            Assert.assertTrue(subscriptionProperties3.isEmpty());
        }
        subscribe6.close();
        restartBroker();
        Consumer subscribe7 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(1).subscriptionMode(subscriptionMode).subscriptionProperties(hashMap).subscriptionName("my-sub").subscribe();
        Assert.assertEquals(((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub").getSubscriptionProperties(), hashMap);
        subscribe7.close();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("6", "7");
        Consumer subscribe8 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(1).subscriptionMode(subscriptionMode).subscriptionType(SubscriptionType.Shared).subscriptionProperties(hashMap3).subscriptionName("my-sub-shared").subscribe();
        PersistentSubscription subscription3 = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub-shared");
        Assert.assertEquals(subscription3.getSubscriptionProperties(), hashMap3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("8", "9");
        Consumer subscribe9 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(1).subscriptionMode(subscriptionMode).subscriptionType(SubscriptionType.Shared).subscriptionProperties(hashMap4).subscriptionName("my-sub-shared").subscribe();
        Assert.assertEquals(subscription3.getSubscriptionProperties(), hashMap3);
        HashMap hashMap5 = new HashMap();
        hashMap5.put("10", "11");
        Consumer subscribe10 = this.pulsarClient.newConsumer().topic(new String[]{str}).receiverQueueSize(1).subscriptionMode(subscriptionMode).subscriptionType(SubscriptionType.Shared).subscriptionProperties(hashMap5).subscriptionName("my-sub-shared").subscribe();
        Assert.assertEquals(subscription3.getSubscriptionProperties(), hashMap3);
        subscribe8.close();
        subscribe9.close();
        subscribe10.close();
    }

    @Test
    public void subscriptionModePersistedTest() throws Exception {
        String str = "persistent://my-property/my-ns/topic" + UUID.randomUUID();
        this.admin.topics().createNonPartitionedTopic(str);
        HashMap hashMap = new HashMap();
        hashMap.put("1", "1");
        hashMap.put("2", "2");
        this.pulsarClient.newConsumer().subscriptionMode(SubscriptionMode.Durable).topic(new String[]{str}).subscriptionProperties(hashMap).subscriptionName("my-sub").subscribe().close();
        Map subscriptionProperties = ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).getSubscription("my-sub").getSubscriptionProperties();
        Assert.assertTrue(subscriptionProperties.containsKey("1"));
        Assert.assertTrue(subscriptionProperties.containsKey("2"));
        Assert.assertEquals((String) subscriptionProperties.get("1"), "1");
        Assert.assertEquals((String) subscriptionProperties.get("2"), "2");
        Assert.assertEquals(hashMap, ((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub")).getSubscriptionProperties());
        this.admin.topics().unload(str);
        Assert.assertEquals(hashMap, ((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub")).getSubscriptionProperties());
        this.admin.topics().createSubscription(str, "my-sub2", MessageId.latest);
        Assert.assertTrue(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub2")).getSubscriptionProperties().isEmpty());
        this.pulsarClient.newConsumer().subscriptionMode(SubscriptionMode.Durable).topic(new String[]{str}).subscriptionProperties(hashMap).subscriptionName("my-sub2").subscribe().close();
        Assert.assertTrue(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("my-sub2")).getSubscriptionProperties().isEmpty());
    }

    @Test
    public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException {
        CloseableHttpClient build = HttpClientBuilder.create().setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(30000).build()).build();
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic").create();
        for (int i = 0; i < 5; i++) {
            create.send(new byte[10]);
        }
        HttpPut httpPut = new HttpPut(String.format("%s/admin/v2/persistent/my-property/my-ns/my-topic/subscription/%s", this.admin.getServiceUrl(), "sub-latest"));
        httpPut.setHeader("Content-Type", "application/json");
        httpPut.setEntity(new StringEntity("\"latest\""));
        Assert.assertEquals(build.execute(httpPut).getStatusLine().getStatusCode(), 204);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-latest")).getMsgBacklog(), 0L);
        HttpPut httpPut2 = new HttpPut(String.format("%s/admin/v2/persistent/my-property/my-ns/my-topic/subscription/%s", this.admin.getServiceUrl(), "sub-earliest"));
        httpPut2.setHeader("Content-Type", "application/json");
        httpPut2.setEntity(new StringEntity("\"earliest\""));
        Assert.assertEquals(build.execute(httpPut2).getStatusLine().getStatusCode(), 204);
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://my-property/my-ns/my-topic").getSubscriptions().get("sub-earliest")).getMsgBacklog(), 5L);
        create.close();
    }

    @Test
    public void testWaitingCurosrCausedMemoryLeak() throws Exception {
        for (int i = 0; i < 10; i++) {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://my-property/my-ns/my-topic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(subscribe.isConnected());
            });
            subscribe.close();
        }
        Assert.assertEquals(((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/my-ns/my-topic").get()).getManagedLedger().getWaitingCursorsCount(), 0);
    }
}
