package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.class */
public class TopicSubscriptionZeroPrefetchTest {
    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionZeroPrefetchTest.class);

    @Rule
    public TestName name = new TestName();
    private Connection connection;
    private Session session;
    private ActiveMQTopic destination;
    private MessageProducer producer;
    private MessageConsumer consumer;
    private BrokerService brokerService;

    public String getTopicName() {
        return this.name.getMethodName();
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = createBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setWatchTopicAdvisories(true);
        this.connection = activeMQConnectionFactory.createConnection();
        this.connection.setClientID("ClientID-1");
        this.session = this.connection.createSession(false, 1);
        this.destination = new ActiveMQTopic(getTopicName());
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZero() throws Exception {
        this.consumer = this.session.createConsumer(new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"));
        this.producer.send(this.session.createTextMessage("M"));
        Assert.assertNotNull("should have received a message the published message", this.consumer.receiveNoWait());
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroClientAckLoopReceive() throws Exception {
        this.consumer = this.connection.createSession(false, 2).createConsumer(new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"));
        for (int i = 0; i < 10; i++) {
            this.producer.send(this.session.createTextMessage("M:" + i));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("should have received message[" + i2 + "]", this.consumer.receive());
        }
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroClientAckLoopTimedReceive() throws Exception {
        this.consumer = this.connection.createSession(false, 2).createConsumer(new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"));
        for (int i = 0; i < 10; i++) {
            this.producer.send(this.session.createTextMessage("M:" + i));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("should have received message[" + i2 + "]", this.consumer.receive(2000L));
        }
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroClientAckLoopReceiveNoWait() throws Exception {
        this.consumer = this.connection.createSession(false, 2).createConsumer(new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0"));
        for (int i = 0; i < 10; i++) {
            this.producer.send(this.session.createTextMessage("M:" + i));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("should have received message[" + i2 + "]", this.consumer.receiveNoWait());
        }
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeAutoAck() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(1);
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeClientAck() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(2);
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeDupsOk() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(3);
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransacted() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(0);
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransactedComitInBatches() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(0);
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeIndividual() throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(4);
    }

    private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int i) throws Exception {
        doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(i, false);
    }

    private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(final int i, final boolean z) throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
        final Session createSession = this.connection.createSession(i == 0, i);
        this.consumer = createSession.createConsumer(activeMQTopic);
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CountDownLatch countDownLatch = new CountDownLatch(2000);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.usecases.TopicSubscriptionZeroPrefetchTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    for (int i2 = 0; i2 < 2000; i2++) {
                        try {
                            Message receive = TopicSubscriptionZeroPrefetchTest.this.consumer.receive();
                            if (receive != null) {
                                countDownLatch.countDown();
                                receive.acknowledge();
                                if (i == 0 && z && (i2 + 1) % 50 == 0) {
                                    createSession.commit();
                                }
                            }
                        } catch (Exception e) {
                            TopicSubscriptionZeroPrefetchTest.LOG.error("Caught exception during receive: {}", e);
                            atomicBoolean.set(true);
                            if (i == 0) {
                                try {
                                    createSession.commit();
                                    return;
                                } catch (JMSException e2) {
                                    TopicSubscriptionZeroPrefetchTest.LOG.error("Caught exception on commit: {}", e2);
                                    atomicBoolean.set(true);
                                    return;
                                }
                            }
                            return;
                        }
                    }
                    if (i == 0) {
                        try {
                            createSession.commit();
                        } catch (JMSException e3) {
                            TopicSubscriptionZeroPrefetchTest.LOG.error("Caught exception on commit: {}", e3);
                            atomicBoolean.set(true);
                        }
                    }
                } catch (Throwable th) {
                    if (i == 0) {
                        try {
                            createSession.commit();
                        } catch (JMSException e4) {
                            TopicSubscriptionZeroPrefetchTest.LOG.error("Caught exception on commit: {}", e4);
                            atomicBoolean.set(true);
                        }
                    }
                    throw th;
                }
            }
        });
        for (int i2 = 0; i2 < 2000; i2++) {
            this.producer.send(this.session.createTextMessage("M:" + i2));
        }
        Assert.assertFalse("Should not have gotten any errors", atomicBoolean.get());
        Assert.assertTrue("Should have read all messages", countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test(timeout = 60000)
    public void testDurableTopicConsumerPrefetchZero() throws Exception {
        this.consumer = this.session.createDurableSubscriber(new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0"), "mysub1");
        this.producer.send(this.session.createTextMessage("M"));
        Assert.assertNotNull("should have received a message the published message", this.consumer.receive(100L));
    }

    @Test(timeout = 420000)
    public void testReceiveTimeoutRespectedWithExpiryProcessing() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
        for (int i = 0; i < 500; i++) {
            this.consumer = this.session.createDurableSubscriber(activeMQTopic, "mysub-" + i);
            this.consumer.close();
        }
        for (int i2 = 0; i2 < 1000; i2++) {
            this.producer.send(this.session.createTextMessage("RTR"), 2, 0, 5000L);
        }
        this.consumer = this.session.createDurableSubscriber(activeMQTopic, "mysub3");
        for (int i3 = 0; i3 < 10; i3++) {
            long currentTimeMillis = System.currentTimeMillis();
            this.consumer.receive(1000L);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            LOG.info("Duration: " + i3 + " : " + currentTimeMillis2);
            Assert.assertTrue("Delay about 500: " + i3, currentTimeMillis2 < 1500);
        }
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.consumer.close();
            this.producer.close();
            this.session.close();
            this.connection.close();
        } catch (Exception e) {
        }
        this.brokerService.stop();
    }

    private BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(MaxUncommittedCountExceededTest.DEFAULT_JMX_BROKER_NAME);
        brokerService.setUseJmx(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector("vm://localhost");
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(5000L);
        policyEntry.setMaxExpirePageSize(2000);
        policyEntry.setUseCache(false);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }
}
