package org.apache.pulsar.broker.service.nonpersistent;

import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.class */
public class NonPersistentTopicTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testAccumulativeStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"non-persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"non-persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("non-persistent://prop/ns-abc/aTopic").create();
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) this.pulsar.getBrokerService().getTopicReference("non-persistent://prop/ns-abc/aTopic").get();
        NonPersistentTopicStatsImpl stats = nonPersistentTopic.getStats(false, false, false);
        Assert.assertEquals(stats.getBytesInCounter(), 0L);
        Assert.assertEquals(stats.getMsgInCounter(), 0L);
        Assert.assertEquals(stats.getBytesOutCounter(), 0L);
        Assert.assertEquals(stats.getMsgOutCounter(), 0L);
        create.newMessage().value("test").eventTime(5L).send();
        Assert.assertNotNull(subscribe.receive());
        Assert.assertNotNull(subscribe2.receive());
        NonPersistentTopicStatsImpl stats2 = nonPersistentTopic.getStats(false, false, false);
        Assert.assertTrue(stats2.getBytesInCounter() > 0);
        Assert.assertTrue(stats2.getMsgInCounter() > 0);
        Assert.assertTrue(stats2.getBytesOutCounter() > 0);
        Assert.assertTrue(stats2.getMsgOutCounter() > 0);
        subscribe.unsubscribe();
        subscribe2.unsubscribe();
        create.close();
        Collection values = nonPersistentTopic.getProducers().values();
        Objects.requireNonNull(nonPersistentTopic);
        values.forEach(nonPersistentTopic::removeProducer);
        Assert.assertEquals(nonPersistentTopic.getProducers().size(), 0);
        NonPersistentTopicStatsImpl stats3 = nonPersistentTopic.getStats(false, false, false);
        Assert.assertEquals(stats3.getBytesInCounter(), stats2.getBytesInCounter());
        Assert.assertEquals(stats3.getMsgInCounter(), stats2.getMsgInCounter());
        Assert.assertEquals(stats3.getBytesOutCounter(), stats2.getBytesOutCounter());
        Assert.assertEquals(stats3.getMsgOutCounter(), stats2.getMsgOutCounter());
    }

    @Test
    public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException {
        this.admin.topics().createPartitionedTopic("non-persistent://prop/ns-abc/testCreateNonExistentPartitions", 4);
        try {
            Producer create = this.pulsarClient.newProducer().topic(TopicName.get("non-persistent://prop/ns-abc/testCreateNonExistentPartitions").getPartition(4).toString()).create();
            try {
                Assert.fail("unexpected behaviour");
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } catch (PulsarClientException.TopicDoesNotExistException e) {
        }
        Assert.assertEquals(this.admin.topics().getPartitionedTopicMetadata("non-persistent://prop/ns-abc/testCreateNonExistentPartitions").partitions, 4);
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSubscriptionsOnNonPersistentTopic() throws Exception {
        String str = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
        try {
            create.send("This is a message");
            NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) Mockito.spy((NonPersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get());
            this.pulsar.getBrokerService().getTopics().put(str, CompletableFuture.completedFuture(Optional.of(nonPersistentTopic)));
            ((NonPersistentTopic) Mockito.doAnswer(invocationOnMock -> {
                return ((SubscriptionOption) invocationOnMock.getArgument(0)).isDurable() ? CompletableFuture.failedFuture(new IllegalArgumentException("isDurable cannot be true when subscribe on non-persistent topic")) : invocationOnMock.callRealMethod();
            }).when(nonPersistentTopic)).subscribe((SubscriptionOption) Mockito.any());
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("exclusive").subscriptionType(SubscriptionType.Exclusive).subscriptionMode(SubscriptionMode.Durable).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("failover").subscriptionType(SubscriptionType.Failover).subscriptionMode(SubscriptionMode.Durable).subscribe();
                try {
                    Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("failover").subscriptionType(SubscriptionType.Failover).subscriptionMode(SubscriptionMode.Durable).subscribe();
                    try {
                        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("shared").subscriptionType(SubscriptionType.Shared).subscriptionMode(SubscriptionMode.Durable).subscribe();
                        try {
                            Consumer subscribe5 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("shared").subscriptionType(SubscriptionType.Shared).subscriptionMode(SubscriptionMode.Durable).subscribe();
                            try {
                                Consumer subscribe6 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).subscriptionMode(SubscriptionMode.Durable).subscribe();
                                try {
                                    Consumer subscribe7 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("key_shared").subscriptionType(SubscriptionType.Key_Shared).subscriptionMode(SubscriptionMode.Durable).subscribe();
                                    try {
                                        Map subscriptions = nonPersistentTopic.getSubscriptions();
                                        Assert.assertEquals(subscriptions.size(), 4);
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("exclusive"));
                                        subscribe.close();
                                        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
                                            return Boolean.valueOf(subscriptions.get("exclusive") == null);
                                        });
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("failover"));
                                        subscribe2.close();
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("failover"));
                                        subscribe3.close();
                                        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
                                            return Boolean.valueOf(subscriptions.get("failover") == null);
                                        });
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("shared"));
                                        subscribe4.close();
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("shared"));
                                        subscribe5.close();
                                        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
                                            return Boolean.valueOf(subscriptions.get("shared") == null);
                                        });
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("key_shared"));
                                        subscribe6.close();
                                        Assert.assertNotNull((NonPersistentSubscription) subscriptions.get("key_shared"));
                                        subscribe7.close();
                                        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
                                            return Boolean.valueOf(subscriptions.get("key_shared") == null);
                                        });
                                        if (Collections.singletonList(subscribe7).get(0) != null) {
                                            subscribe7.close();
                                        }
                                        if (Collections.singletonList(subscribe6).get(0) != null) {
                                            subscribe6.close();
                                        }
                                        if (Collections.singletonList(subscribe5).get(0) != null) {
                                            subscribe5.close();
                                        }
                                        if (Collections.singletonList(subscribe4).get(0) != null) {
                                            subscribe4.close();
                                        }
                                        if (Collections.singletonList(subscribe3).get(0) != null) {
                                            subscribe3.close();
                                        }
                                        if (Collections.singletonList(subscribe2).get(0) != null) {
                                            subscribe2.close();
                                        }
                                        if (Collections.singletonList(subscribe).get(0) != null) {
                                            subscribe.close();
                                        }
                                    } catch (Throwable th) {
                                        if (Collections.singletonList(subscribe7).get(0) != null) {
                                            subscribe7.close();
                                        }
                                        throw th;
                                    }
                                } catch (Throwable th2) {
                                    if (Collections.singletonList(subscribe6).get(0) != null) {
                                        subscribe6.close();
                                    }
                                    throw th2;
                                }
                            } catch (Throwable th3) {
                                if (Collections.singletonList(subscribe5).get(0) != null) {
                                    subscribe5.close();
                                }
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (Collections.singletonList(subscribe4).get(0) != null) {
                                subscribe4.close();
                            }
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        if (Collections.singletonList(subscribe3).get(0) != null) {
                            subscribe3.close();
                        }
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th6;
                }
            } catch (Throwable th7) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th7;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testRemoveProducerOnNonPersistentTopic() throws Exception {
        String str = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID();
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get();
        Field declaredField = AbstractTopic.class.getDeclaredField("userCreatedProducerCount");
        declaredField.setAccessible(true);
        Assert.assertEquals(((Integer) declaredField.get(nonPersistentTopic)).intValue(), 1);
        create.close();
        Assert.assertEquals(((Integer) declaredField.get(nonPersistentTopic)).intValue(), 0);
    }
}
