package org.apache.activemq.usecases;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.class */
public class QueueZeroPrefetchLazyDispatchPriorityTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class);
    private final byte[] PAYLOAD = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
    private final int ITERATIONS = 2;
    private BrokerService broker;

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test(timeout = 120000)
    public void testPriorityMessages() throws Exception {
        for (int i = 0; i < 2; i++) {
            produceMessages(4, 4, "TestQ");
            produceMessages(1, 5, "TestQ");
            LOG.info("On iteration {}", Integer.valueOf(i));
            Thread.sleep(1000L);
            LOG.info("Consumed list " + consumeMessages("TestQ").size());
            Assert.assertEquals("message 1 should be priority high", 5L, r0.get(0).getJMSPriority());
            Assert.assertEquals("message 2 should be priority medium", 4L, r0.get(1).getJMSPriority());
            Assert.assertEquals("message 3 should be priority medium", 4L, r0.get(2).getJMSPriority());
            Assert.assertEquals("message 4 should be priority medium", 4L, r0.get(3).getJMSPriority());
            Assert.assertEquals("message 5 should be priority medium", 4L, r0.get(4).getJMSPriority());
        }
    }

    @Test(timeout = 120000)
    @Ignore("Flaky test on Jenkins, should be refactored")
    public void testPriorityMessagesMoreThanPageSize() throws Exception {
        for (int i = 0; i < 2; i++) {
            produceMessages(4, 4, "TestQ");
            Thread.sleep(1000L);
            produceMessages(1, 5, "TestQ");
            Thread.sleep(2000L);
            LOG.info("On iteration {}", Integer.valueOf(i));
            ArrayList<Message> consumeMessages = consumeMessages("TestQ");
            LOG.info("Consumed list {}", Integer.valueOf(consumeMessages.size()));
            Assert.assertFalse("Consumed list should not be empty", consumeMessages.isEmpty());
            Assert.assertEquals("message 1 should be priority high", 5L, consumeMessages.get(0).getJMSPriority());
            for (int i2 = 1; i2 < 4; i2++) {
                Assert.assertEquals("message " + i2 + " should be priority medium", 4L, consumeMessages.get(i2).getJMSPriority());
            }
        }
    }

    @Test(timeout = 120000)
    public void testLongLivedPriorityConsumer() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        try {
            MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("TestQ"));
            createConnection.start();
            for (int i = 0; i < 2; i++) {
                produceMessages(4, 4, "TestQ");
                produceMessages(1, 5, "TestQ");
                Assert.assertEquals("message should be priority high", 5L, createConsumer.receive(4000L).getJMSPriority());
            }
            ArrayList<Message> consumeMessages = consumeMessages("TestQ");
            LOG.info("Consumed list {}", Integer.valueOf(consumeMessages.size()));
            Iterator<Message> it = consumeMessages.iterator();
            while (it.hasNext()) {
                Assert.assertEquals("should be priority medium", 4L, it.next().getJMSPriority());
            }
        } finally {
            createConnection.close();
        }
    }

    @Test(timeout = 120000)
    public void testPriorityMessagesWithJmsBrowser() throws Exception {
        for (int i = 0; i < 2; i++) {
            produceMessages(4, 4, "TestQ");
            LOG.info("Browsed: {}", Integer.valueOf(browseMessages("TestQ").size()));
            produceMessages(1, 5, "TestQ");
            Thread.sleep(1000L);
            LOG.info("On iteration {}", Integer.valueOf(i));
            Assert.assertNotNull(consumeOneMessage("TestQ"));
            Assert.assertEquals(5L, r0.getJMSPriority());
            LOG.info("Consumed list {}", Integer.valueOf(consumeMessages("TestQ").size()));
            for (int i2 = 1; i2 < 4; i2++) {
                Assert.assertEquals("Iteration: " + i + ", message " + i2 + " should be priority medium", 4L, r0.get(i2).getJMSPriority());
            }
        }
    }

    @Test(timeout = 120000)
    public void testJmsBrowserGetsPagedIn() throws Exception {
        for (int i = 0; i < 2; i++) {
            produceMessages(5, 4, "TestQ");
            LOG.info("Browsed: {}", Integer.valueOf(browseMessages("TestQ").size()));
            Assert.assertEquals(0L, r0.size());
            Assert.assertNotNull(consumeOneMessage("TestQ", 2));
            LOG.info("Browsed: {}", Integer.valueOf(browseMessages("TestQ").size()));
            Assert.assertEquals("see only the paged in for pull", 1L, r0.size());
            LOG.info("Consumed list " + consumeMessages("TestQ").size());
            Assert.assertEquals(5L, r0.size());
        }
    }

    private void produceMessages(int i, int i2, String str) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        activeMQConnectionFactory.setConnectionIDPrefix("pri-" + i2);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue(str));
            createConnection.start();
            for (int i3 = 0; i3 < i; i3++) {
                BytesMessage createBytesMessage = createSession.createBytesMessage();
                createBytesMessage.writeBytes(this.PAYLOAD);
                createBytesMessage.setJMSPriority(i2);
                createProducer.send(createBytesMessage, 2, createBytesMessage.getJMSPriority(), 0L);
            }
        } finally {
            if (createConnection != null) {
                createConnection.close();
            }
        }
    }

    private ArrayList<Message> consumeMessages(String str) throws Exception {
        ArrayList<Message> arrayList = new ArrayList<>();
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        try {
            MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue(str));
            createConnection.start();
            boolean z = false;
            while (!z) {
                Message receive = createConsumer.receive(arrayList.isEmpty() ? 5000L : 1000L);
                if (receive == null) {
                    z = true;
                }
                if (receive != null) {
                    arrayList.add(receive);
                }
            }
            createConsumer.close();
            if (createConnection != null) {
                createConnection.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    private Message consumeOneMessage(String str) throws Exception {
        return consumeOneMessage(str, 1);
    }

    private Message consumeOneMessage(String str, int i) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        try {
            MessageConsumer createConsumer = createConnection.createSession(false, i).createConsumer(new ActiveMQQueue(str));
            createConnection.start();
            Message receive = createConsumer.receive(4000L);
            if (receive == null) {
                receive = createConsumer.receive(2000L);
            }
            return receive;
        } finally {
            if (createConnection != null) {
                createConnection.close();
            }
        }
    }

    private ArrayList<Message> browseMessages(String str) throws Exception {
        ArrayList<Message> arrayList = new ArrayList<>();
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        try {
            QueueBrowser createBrowser = createConnection.createSession(false, 1).createBrowser(new ActiveMQQueue(str));
            createConnection.start();
            Enumeration enumeration = createBrowser.getEnumeration();
            while (enumeration.hasMoreElements()) {
                arrayList.add((Message) enumeration.nextElement());
            }
            return arrayList;
        } finally {
            if (createConnection != null) {
                createConnection.close();
            }
        }
    }

    private BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPrioritizedMessages(true);
        policyEntry.setExpireMessagesPeriod(500L);
        policyEntry.setMaxPageSize(100);
        policyEntry.setMaxExpirePageSize(0);
        policyEntry.setMaxBrowsePageSize(0);
        policyEntry.setQueuePrefetch(0);
        policyEntry.setLazyDispatch(true);
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setUseCache(false);
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector("tcp://0.0.0.0:0");
        return brokerService;
    }
}
