package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/PriorityRedeliveryOrderTest.class */
public class PriorityRedeliveryOrderTest {
    private static final Logger LOG = LoggerFactory.getLogger(PriorityRedeliveryOrderTest.class);
    private static final String DESTINATION = "testQ1";
    private static final int MESSAGES_TO_SEND = 1000;
    private static final int MESSAGES_PER_CONSUMER = 200;
    private int consumedAppId = -1;
    private int totalConsumed;
    BrokerService broker;

    @Before
    public void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPrioritizedMessages(true);
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testMessageDeliveryOrderAfterPrefetch() throws Exception {
        sendMessages(1000);
        for (int i = 0; i < 5; i++) {
            this.totalConsumed += consumeMessages(MESSAGES_PER_CONSUMER);
        }
        Assert.assertEquals("number of messages consumed should be equal to number of messages sent", 1000L, this.totalConsumed);
    }

    private Long sendMessages(int i) throws Exception {
        long j = 0;
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        createConnection.start();
        try {
            try {
                Session createSession = createConnection.createSession(true, 0);
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(DESTINATION));
                TextMessage createTextMessage = createSession.createTextMessage("test_message");
                for (int i2 = 0; i2 < i; i2++) {
                    createTextMessage.setIntProperty("appID", i2);
                    createProducer.send(createTextMessage);
                    createSession.commit();
                    j++;
                }
                LOG.info(" Finished after producing : " + j);
                Long valueOf = Long.valueOf(j);
                if (createConnection != null) {
                    createConnection.close();
                }
                return valueOf;
            } catch (Exception e) {
                LOG.info("Exception received producing ", e);
                LOG.info("finishing after exception :" + j);
                Long valueOf2 = Long.valueOf(j);
                if (createConnection != null) {
                    createConnection.close();
                }
                return valueOf2;
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    private int consumeMessages(int i) throws Exception {
        LOG.info("Creating new consumer for:" + i);
        int i2 = 0;
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(true, 0);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(DESTINATION));
            do {
                if (1 != 0) {
                    Message receive = createConsumer.receive(4000L);
                    if (receive == null) {
                        LOG.info("Break on:" + i2);
                    } else {
                        int intProperty = receive.getIntProperty("appID");
                        i2++;
                        LOG.debug("Message newAppID" + intProperty);
                        if (intProperty != this.consumedAppId + 1) {
                            Assert.fail(" newAppId is " + intProperty + " expected " + (this.consumedAppId + 1));
                        }
                        this.consumedAppId = intProperty;
                        createSession.commit();
                    }
                }
                return i2;
            } while (i2 != i);
            LOG.info("closing consumer after 200 message, consumedAppID is " + this.consumedAppId);
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Exception e) {
                }
            }
            return i2;
        } finally {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Exception e2) {
                }
            }
        }
    }
}
