/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.api;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"broker-api"})
public class NonDurableSubscriptionTest
extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonDurableSubscriptionTest.class);

    @Override
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setSubscriptionExpirationTimeMinutes(1);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNonDurableSubscription() throws Exception {
        String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).readCompacted(true).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message message;
                int i;
                int messageNum = 10;
                for (i = 0; i < messageNum; ++i) {
                    producer.send((Object)("message" + i));
                }
                for (i = 0; i < 5; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                    consumer.acknowledge(message);
                }
                ((ConsumerImpl)consumer).getClientCnx().close();
                for (i = 5; i < messageNum; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Test(timeOut=10000L)
    public void testDeleteInactiveNonPersistentSubscription() throws Exception {
        String topic = "non-persistent://my-property/my-ns/topic-" + UUID.randomUUID();
        String subName = "my-subscriber";
        this.admin.topics().createNonPartitionedTopic(topic);
        Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topic}).subscriptionName("my-subscriber").subscribe();
        NonPersistentTopic nonPersistentTopic = (NonPersistentTopic)((Optional)this.pulsar.getBrokerService().getTopicIfExists(topic).get()).get();
        NonPersistentSubscription nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        Assert.assertNotNull((Object)nonPersistentSubscription);
        Assert.assertNotNull((Object)nonPersistentSubscription.getDispatcher());
        AssertJUnit.assertTrue((boolean)nonPersistentSubscription.getDispatcher().isConsumerConnected());
        AssertJUnit.assertFalse((boolean)nonPersistentSubscription.isReplicated());
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        Assert.assertNotNull((Object)nonPersistentSubscription);
        consumer.close();
        Thread.sleep(500L);
        Field f = NonPersistentSubscription.class.getDeclaredField("lastActive");
        f.setAccessible(true);
        f.set(nonPersistentTopic.getSubscription("my-subscriber"), System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        nonPersistentTopic.checkInactiveSubscriptions();
        Thread.sleep(500L);
        nonPersistentSubscription = (NonPersistentSubscription)nonPersistentTopic.getSubscription("my-subscriber");
        AssertJUnit.assertNull((Object)nonPersistentSubscription);
    }

    @DataProvider(name="subscriptionTypes")
    public static Object[][] subscriptionTypes() {
        Object[][] result = new Object[SubscriptionType.values().length][];
        int i = 0;
        for (SubscriptionType type : SubscriptionType.values()) {
            result[i++] = new Object[]{type};
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="subscriptionTypes")
    public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType) throws Exception {
        log.info("testing {}", (Object)subscriptionType);
        String topicName = "persistent://my-property/my-ns/nonDurable-sub-recorvery-" + subscriptionType;
        Producer producer = this.pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        try {
            Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{topicName}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(subscriptionType).subscriptionName("my-nonDurable-subscriber").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                Message message;
                int i;
                int messageNum = 15;
                for (i = 0; i < messageNum; ++i) {
                    producer.send((Object)("message" + i));
                }
                for (i = 0; i < 5; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                    consumer.acknowledge(message);
                }
                ((ConsumerImpl)consumer).getClientCnx().close();
                for (i = 5; i < 10; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
                this.restartBroker();
                for (i = 10; i < messageNum; ++i) {
                    message = consumer.receive();
                    Assert.assertNotNull((Object)message);
                    Assert.assertEquals((String)((String)message.getValue()), (String)("message" + i));
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }
}

