package org.apache.activemq.transport.nio;

import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.ProducerViewMBean;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.class */
public class NIOAsyncSendWithPFCTest extends TestCase {
    private static final String DESTINATION_ONE = "testQ1";
    private static final String DESTINATION_TWO = "testQ2";
    private static final int MESSAGES_TO_SEND = 100;
    private static final Logger LOG = LoggerFactory.getLogger(NIOAsyncSendWithPFCTest.class);
    private static String TRANSPORT_URL = "nio://0.0.0.0:0";
    private static int NUMBER_OF_PRODUCERS = 10;

    /* loaded from: input_file:org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest$ProducerTask.class */
    class ProducerTask implements Runnable {
        boolean sync;

        ProducerTask(NIOAsyncSendWithPFCTest nIOAsyncSendWithPFCTest) {
            this(false);
        }

        ProducerTask(boolean z) {
            this.sync = false;
            this.sync = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                NIOAsyncSendWithPFCTest.this.sendMessages(100, NIOAsyncSendWithPFCTest.DESTINATION_ONE, this.sync);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(256000L);
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector(TRANSPORT_URL);
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue(DESTINATION_ONE)});
        brokerService.start();
        TRANSPORT_URL = brokerService.getTransportConnectorByScheme("nio").getPublishableConnectString();
        return brokerService;
    }

    public void testAsyncSendPFCNewConnection() throws Exception {
        BrokerService createBroker = createBroker();
        createBroker.waitUntilStarted();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
        QueueView queueView = getQueueView(createBroker, DESTINATION_ONE);
        for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
            try {
                newFixedThreadPool.submit(new ProducerTask(this));
            } finally {
                createBroker.stop();
                createBroker.waitUntilStopped();
            }
        }
        waitForProducerFlowControl(createBroker, queueView);
        try {
            sendMessages(1, DESTINATION_TWO, false);
        } catch (Exception e) {
            LOG.error("Ex on send  new connection", e);
            fail("*** received the following exception when creating addition producer new connection:" + e);
        }
    }

    public void testAsyncSendPFCExistingConnection() throws Exception {
        BrokerService createBroker = createBroker();
        createBroker.waitUntilStarted();
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL + "?wireFormat.maxInactivityDuration=5000").createConnection();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
        QueueView queueView = getQueueView(createBroker, DESTINATION_ONE);
        for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
            try {
                newFixedThreadPool.submit(new ProducerTask(this));
            } finally {
                createBroker.stop();
                createBroker.waitUntilStopped();
            }
        }
        waitForProducerFlowControl(createBroker, queueView);
        assertTrue("Producer view blocked", getProducerView(createBroker, DESTINATION_ONE).isProducerBlocked());
        try {
            createConnection.createSession(false, 1);
        } catch (Exception e) {
            LOG.error("Ex on create session", e);
            fail("*** received the following exception when creating producer session:" + e);
        }
    }

    public void testSyncSendPFCExistingConnection() throws Exception {
        BrokerService createBroker = createBroker();
        createBroker.waitUntilStarted();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
        QueueView queueView = getQueueView(createBroker, DESTINATION_ONE);
        for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
            try {
                newFixedThreadPool.submit(new ProducerTask(true));
            } catch (Throwable th) {
                createBroker.stop();
                createBroker.waitUntilStopped();
                throw th;
            }
        }
        waitForProducerFlowControl(createBroker, queueView);
        assertTrue("Producer view blocked", getProducerView(createBroker, DESTINATION_ONE).isProducerBlocked());
        createBroker.stop();
        createBroker.waitUntilStopped();
    }

    private void waitForProducerFlowControl(BrokerService brokerService, QueueView queueView) throws Exception {
        boolean z;
        do {
            z = queueView.getBlockedSends() >= 10;
            LOG.info("Blocking all sends:" + queueView.getBlockedSends());
            Thread.sleep(1000L);
        } while (!z);
    }

    private Long sendMessages(int i, String str, boolean z) throws Exception {
        long j = 0;
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL).createConnection();
        if (z) {
            createConnection.setUseAsyncSend(false);
            createConnection.setAlwaysSyncSend(true);
        } else {
            createConnection.setUseAsyncSend(true);
        }
        createConnection.start();
        try {
            try {
                Session createSession = createConnection.createSession(false, 1);
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
                TextMessage createTextMessage = createTextMessage(createSession);
                for (int i2 = 0; i2 < i; i2++) {
                    createProducer.send(createTextMessage);
                    j++;
                }
                LOG.info(" Finished after producing : " + j);
                Long valueOf = Long.valueOf(j);
                if (createConnection != null) {
                    try {
                        createConnection.close();
                    } catch (JMSException e) {
                    }
                }
                return valueOf;
            } catch (Throwable th) {
                if (createConnection != null) {
                    try {
                        createConnection.close();
                    } catch (JMSException e2) {
                    }
                }
                throw th;
            }
        } catch (JMSException e3) {
            LOG.debug("Exception received producing ", e3);
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (JMSException e4) {
                }
            }
            return Long.valueOf(j);
        }
    }

    private TextMessage createTextMessage(Session session) throws JMSException {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 1000; i++) {
            stringBuffer.append("1234567890");
        }
        return session.createTextMessage(stringBuffer.toString());
    }

    private QueueView getQueueView(BrokerService brokerService, String str) throws Exception {
        Map queueViews = brokerService.getAdminView().getBroker().getQueueViews();
        Iterator it = queueViews.keySet().iterator();
        while (it.hasNext()) {
            QueueView queueView = (DestinationView) queueViews.get((ObjectName) it.next());
            if (queueView instanceof QueueView) {
                QueueView queueView2 = queueView;
                if (queueView2.getName().equals(str)) {
                    return queueView2;
                }
            }
        }
        return null;
    }

    private ProducerViewMBean getProducerView(BrokerService brokerService, String str) throws Exception {
        for (ObjectName objectName : brokerService.getAdminView().getQueueProducers()) {
            ProducerViewMBean producerViewMBean = (ProducerViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, ProducerViewMBean.class, true);
            LOG.info(producerViewMBean.getProducerId() + ", dest: " + producerViewMBean.getDestinationName() + ", blocked: " + producerViewMBean.isProducerBlocked());
            if (producerViewMBean.getDestinationName().contains(str)) {
                return producerViewMBean;
            }
        }
        return null;
    }
}
