package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.class */
public class OptimizeAcknowledgeWithExpiredMsgsTest {
    private static final Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class);
    private BrokerService broker = null;
    private String connectionUri;

    /* loaded from: input_file:org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest$MyMessageListener.class */
    private class MyMessageListener implements MessageListener, ExceptionListener {
        private AtomicInteger counter = new AtomicInteger(0);

        private MyMessageListener() {
        }

        public void onMessage(Message message) {
            try {
                OptimizeAcknowledgeWithExpiredMsgsTest.LOG.trace("Got Message " + message.getJMSMessageID());
                OptimizeAcknowledgeWithExpiredMsgsTest.LOG.info("counter at " + this.counter.incrementAndGet());
            } catch (Exception e) {
            }
        }

        public int getCounter() {
            return this.counter.get();
        }

        public synchronized void onException(JMSException jMSException) {
            OptimizeAcknowledgeWithExpiredMsgsTest.LOG.error("JMS Exception occured.  Shutting down client.");
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setUseJmx(false);
        this.connectionUri = brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        return brokerService;
    }

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    @Test
    public void testOptimizedAckWithExpiredMsgs() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100").createConnection();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("TEST.FOO");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final MyMessageListener myMessageListener = new MyMessageListener();
        createConnection.setExceptionListener(myMessageListener);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        String str = "Hello world! From: " + Thread.currentThread().getName() + " : " + hashCode();
        for (int i = 0; i < 45; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(str);
            createProducer.send(createTextMessage, 1, 1, 100L);
            LOG.trace("Sent message: " + createTextMessage.getJMSMessageID() + " with expiry 10 msec");
        }
        for (int i2 = 0; i2 < 60; i2++) {
            TextMessage createTextMessage2 = createSession.createTextMessage(str);
            createProducer.send(createTextMessage2, 1, 1, 60000L);
            LOG.trace("Sent message: " + createTextMessage2.getJMSMessageID() + " with expiry 30 sec");
        }
        createConsumer.setMessageListener(myMessageListener);
        sleep(1000);
        createConnection.start();
        Assert.assertTrue("Should receive all expected messages, counter at " + myMessageListener.getCounter(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.OptimizeAcknowledgeWithExpiredMsgsTest.1
            public boolean isSatisified() throws Exception {
                return myMessageListener.getCounter() == 60;
            }
        }));
        LOG.info("Received all expected messages with counter at: " + myMessageListener.getCounter());
        createProducer.close();
        createConsumer.close();
        createSession.close();
        createConnection.close();
    }

    @Test
    public void testOptimizedAckWithExpiredMsgsSync() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("TEST.FOO");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        String str = "Hello world! From: " + Thread.currentThread().getName() + " : " + hashCode();
        for (int i = 0; i < 45; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(str);
            createProducer.send(createTextMessage, 1, 1, 10L);
            LOG.trace("Sent message: " + createTextMessage.getJMSMessageID() + " with expiry 10 msec");
        }
        for (int i2 = 0; i2 < 60; i2++) {
            TextMessage createTextMessage2 = createSession.createTextMessage(str);
            createProducer.send(createTextMessage2, 1, 1, 30000L);
            LOG.trace("Sent message: " + createTextMessage2.getJMSMessageID() + " with expiry 30 sec");
        }
        sleep(200);
        int i3 = 1;
        while (i3 <= 60) {
            Assert.assertNotNull(createConsumer.receive(2000L));
            LOG.info("counter at " + i3);
            i3++;
        }
        LOG.info("Received all expected messages with counter at: " + i3);
        createProducer.close();
        createConsumer.close();
        createSession.close();
        createConnection.close();
    }

    @Test
    public void testOptimizedAckWithExpiredMsgsSync2() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("TEST.FOO");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setDeliveryMode(1);
        String str = "Hello world! From: " + Thread.currentThread().getName() + " : " + hashCode();
        for (int i = 0; i < 56; i++) {
            TextMessage createTextMessage = createSession.createTextMessage(str);
            createProducer.send(createTextMessage, 1, 1, 30000L);
            LOG.trace("Sent message: " + createTextMessage.getJMSMessageID() + " with expiry 30 sec");
        }
        for (int i2 = 0; i2 < 44; i2++) {
            TextMessage createTextMessage2 = createSession.createTextMessage(str);
            createProducer.send(createTextMessage2, 1, 1, 10L);
            LOG.trace("Sent message: " + createTextMessage2.getJMSMessageID() + " with expiry 10 msec");
        }
        for (int i3 = 0; i3 < 4; i3++) {
            TextMessage createTextMessage3 = createSession.createTextMessage(str);
            createProducer.send(createTextMessage3, 1, 1, 30000L);
            LOG.trace("Sent message: " + createTextMessage3.getJMSMessageID() + " with expiry 30 sec");
        }
        sleep(200);
        int i4 = 1;
        while (i4 <= 60) {
            Assert.assertNotNull(createConsumer.receive(2000L));
            LOG.info("counter at " + i4);
            i4++;
        }
        LOG.info("Received all expected messages with counter at: " + i4);
        createProducer.close();
        createConsumer.close();
        createSession.close();
        createConnection.close();
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }
}
