package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.lang.Thread;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.nio.NIOSSLConcurrencyTest;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(BlockJUnit4ClassRunner.class)
/* loaded from: input_file:org/apache/activemq/bugs/MemoryUsageBlockResumeTest.class */
public class MemoryUsageBlockResumeTest extends TestSupport implements Thread.UncaughtExceptionHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class);
    private static byte[] buf = new byte[NIOSSLConcurrencyTest.MESSAGE_SIZE];
    private static byte[] bigBuf = new byte[49152];
    private BrokerService broker;
    private String connectionUri;
    public int deliveryMode = 2;
    AtomicInteger messagesSent = new AtomicInteger(0);
    AtomicInteger messagesConsumed = new AtomicInteger(0);
    protected long messageReceiveTimeout = DurableSubProcessWithRestartTest.BROKER_RESTART;
    Destination destination = new ActiveMQQueue("FooTwo");
    Destination bigDestination = new ActiveMQQueue("FooTwoBig");
    private final Vector<Throwable> exceptions = new Vector<>();

    @Test(timeout = 60000)
    public void testBlockByOtherResumeNoException() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        activeMQConnectionFactory.setProducerWindowSize(49152);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setTopicPrefetch(10);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.bigDestination);
        final Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        createProducer.setDeliveryMode(this.deliveryMode);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.bigDestination, createSession2.createTextMessage(new String(bigBuf) + i));
            this.messagesSent.incrementAndGet();
            LOG.info("After big: " + i + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
        }
        Thread thread = new Thread("Producing thread") { // from class: org.apache.activemq.bugs.MemoryUsageBlockResumeTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession3 = createConnection2.createSession(false, 1);
                    MessageProducer createProducer2 = createSession3.createProducer(MemoryUsageBlockResumeTest.this.destination);
                    createProducer2.setDeliveryMode(MemoryUsageBlockResumeTest.this.deliveryMode);
                    for (int i2 = 0; i2 < 20; i2++) {
                        createProducer2.send(MemoryUsageBlockResumeTest.this.destination, createSession3.createTextMessage(new String(MemoryUsageBlockResumeTest.buf) + i2));
                        MemoryUsageBlockResumeTest.this.messagesSent.incrementAndGet();
                        MemoryUsageBlockResumeTest.LOG.info("After little:" + i2 + ", System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        Thread thread2 = new Thread("Producing thread") { // from class: org.apache.activemq.bugs.MemoryUsageBlockResumeTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession3 = createConnection2.createSession(false, 1);
                    MessageProducer createProducer2 = createSession3.createProducer(MemoryUsageBlockResumeTest.this.destination);
                    createProducer2.setDeliveryMode(MemoryUsageBlockResumeTest.this.deliveryMode);
                    for (int i2 = 0; i2 < 20; i2++) {
                        createProducer2.send(MemoryUsageBlockResumeTest.this.destination, createSession3.createTextMessage(new String(MemoryUsageBlockResumeTest.buf) + i2));
                        MemoryUsageBlockResumeTest.this.messagesSent.incrementAndGet();
                        MemoryUsageBlockResumeTest.LOG.info("After little:" + i2 + ", System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread2.start();
        assertTrue("producer has sent x in a reasonable time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.MemoryUsageBlockResumeTest.3
            public boolean isSatisified() throws Exception {
                MemoryUsageBlockResumeTest.LOG.info("Checking for : X sent, System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent:  " + MemoryUsageBlockResumeTest.this.messagesSent);
                return MemoryUsageBlockResumeTest.this.messagesSent.get() > 20;
            }
        }));
        LOG.info("Consuming from big q to allow delivery to smaller q from pending");
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = createConsumer.receive(this.messageReceiveTimeout);
            assertTrue(receive != null);
            LOG.info("Recieved Message (" + i2 + "):" + receive + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
            this.messagesConsumed.incrementAndGet();
        }
        createConsumer.close();
        thread.join();
        thread2.join();
        assertEquals("Incorrect number of Messages Sent: " + this.messagesSent.get(), this.messagesSent.get(), 50);
        MessageConsumer createConsumer2 = createSession.createConsumer(this.destination);
        for (int i3 = 0; i3 < 40; i3++) {
            Message receive2 = createConsumer2.receive(this.messageReceiveTimeout);
            assertTrue(receive2 != null);
            LOG.info("Recieved Message (" + i3 + "):" + receive2 + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
            this.messagesConsumed.incrementAndGet();
        }
        assertEquals("Incorrect number of Messages consumed: " + this.messagesConsumed.get(), this.messagesSent.get(), this.messagesConsumed.get());
    }

    @Before
    public void setUp() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "activemq-data");
        this.broker.setPersistent(true);
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.getSystemUsage().getMemoryUsage().setLimit(491520L);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.start();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        LOG.error("Unexpected Unhandeled ex on: " + thread, th);
        this.exceptions.add(th);
    }
}
