package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TopicProducerDurableSubFlowControlTest.class */
public class TopicProducerDurableSubFlowControlTest extends TestCase implements MessageListener {
    private static final String brokerName = "testBroker";
    private static final String brokerUrl = "vm://testBroker";
    protected static final int destinationMemLimit = 2097152;
    private static final int numMessagesToSend = 10000;
    private BrokerService broker;
    private static final Logger LOG = LoggerFactory.getLogger(TopicProducerDurableSubFlowControlTest.class);
    private static final AtomicLong produced = new AtomicLong();
    private static final AtomicLong consumed = new AtomicLong();

    protected void setUp() throws Exception {
        doSetup(true);
    }

    private void doSetup(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName(brokerName);
        this.broker.setSchedulerSupport(false);
        this.broker.setUseJmx(true);
        this.broker.setUseShutdownHook(false);
        this.broker.addConnector(brokerUrl);
        this.broker.setAdvisorySupport(false);
        this.broker.getSystemUsage().getMemoryUsage().setLimit(20971520L);
        this.broker.setDeleteAllMessagesOnStartup(z);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopic(">");
        policyEntry.setMemoryLimit(2097152L);
        policyEntry.setCursorMemoryHighWaterMark(10);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setAdvisoryWhenFull(true);
        policyEntry.setExpireMessagesPeriod(0L);
        policyMap.setPolicyEntries(Arrays.asList(policyEntry));
        setDestinationPolicy(this.broker, policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void setDestinationPolicy(BrokerService brokerService, PolicyMap policyMap) {
        brokerService.setDestinationPolicy(policyMap);
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public void testTopicProducerFlowControl() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setClientID("cliId1");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(createDestination(), "DurableSub-0").close();
        createSession.createDurableSubscriber(createDestination(), "DurableSub-1").setMessageListener(this);
        final Session createSession2 = activeMQConnectionFactory.createConnection().createSession(false, 1);
        final MessageProducer createProducer = createSession2.createProducer(createDestination());
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.TopicProducerDurableSubFlowControlTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                for (long j = 0; j < DurableSubProcessWithRestartTest.BROKER_RESTART; j++) {
                    try {
                        try {
                            createProducer.send(createSession2.createTextMessage("test"));
                            long incrementAndGet = TopicProducerDurableSubFlowControlTest.produced.incrementAndGet();
                            if (incrementAndGet % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
                                TopicProducerDurableSubFlowControlTest.LOG.info("Produced " + incrementAndGet + " messages");
                            }
                        } catch (Throwable th) {
                            th.printStackTrace();
                            try {
                                createProducer.close();
                                createSession2.close();
                                return;
                            } catch (Exception e) {
                                return;
                            }
                        }
                    } finally {
                        try {
                            createProducer.close();
                            createSession2.close();
                        } catch (Exception e2) {
                        }
                    }
                }
            }
        };
        thread.start();
        ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        arrayList.addAll(Arrays.asList(this.broker.getAdminView().getInactiveDurableTopicSubscribers()));
        arrayList.addAll(Arrays.asList(this.broker.getAdminView().getDurableTopicSubscribers()));
        assertTrue("have a sub", !arrayList.isEmpty());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add((DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance((ObjectName) it.next(), DurableSubscriptionViewMBean.class, true));
        }
        LOG.info("Wait for producer to stop");
        assertTrue("producer thread is done", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TopicProducerDurableSubFlowControlTest.2
            public boolean isSatisified() throws Exception {
                Iterator it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) it2.next();
                    TopicProducerDurableSubFlowControlTest.LOG.info("name: " + durableSubscriptionViewMBean.getSubscriptionName());
                    TopicProducerDurableSubFlowControlTest.LOG.info("cursor size: " + durableSubscriptionViewMBean.cursorSize());
                    TopicProducerDurableSubFlowControlTest.LOG.info("mem usage: " + durableSubscriptionViewMBean.getCursorMemoryUsage());
                    TopicProducerDurableSubFlowControlTest.LOG.info("mem % usage: " + durableSubscriptionViewMBean.getCursorPercentUsage());
                }
                return !thread.isAlive();
            }
        }, 600000L));
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) it2.next();
            LOG.info("name: " + durableSubscriptionViewMBean.getSubscriptionName());
            LOG.info("cursor size: " + durableSubscriptionViewMBean.cursorSize());
            LOG.info("mem usage: " + durableSubscriptionViewMBean.getCursorMemoryUsage());
            LOG.info("mem % usage: " + durableSubscriptionViewMBean.getCursorPercentUsage());
            if (durableSubscriptionViewMBean.cursorSize() > 0) {
                assertTrue("Has a decent usage", durableSubscriptionViewMBean.getCursorPercentUsage() > 5);
            }
        }
    }

    protected ActiveMQTopic createDestination() throws Exception {
        return new ActiveMQTopic("test");
    }

    public void onMessage(Message message) {
        long incrementAndGet = consumed.incrementAndGet();
        if (incrementAndGet % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
            LOG.info("\tConsumed " + incrementAndGet + " messages");
        }
    }
}
