package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2801Test.class */
public class AMQ2801Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2801Test.class);
    private static final String TOPICNAME = "InvalidPendingQueueTest";
    private static final String SELECTOR1 = "JMS_ID = 'TEST'";
    private static final String SELECTOR2 = "JMS_ID = 'TEST2'";
    private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1";
    private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2";
    private static final int MSG_COUNT = 2500;
    private Session session1;
    private Connection conn1;
    private Topic topic1;
    private MessageConsumer consumer1;
    private Session session2;
    private Connection conn2;
    private Topic topic2;
    private MessageConsumer consumer2;
    private BrokerService broker;
    private String connectionUri;

    @Before
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "activemq-data");
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).setName("Default");
        applyMemoryLimitPolicy(this.broker);
        this.broker.start();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    private void applyMemoryLimitPolicy(BrokerService brokerService) {
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(5818230784L);
        systemUsage.getStoreUsage().setLimit(6442450944L);
        systemUsage.getTempUsage().setLimit(3221225472L);
        brokerService.setSystemUsage(systemUsage);
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setProducerFlowControl(false);
        policyEntry.setMemoryLimit(504857608L);
        policyEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
        arrayList.add(policyEntry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
        this.conn1.close();
        this.conn2.close();
    }

    private void produceMessages() throws Exception {
        TopicConnection createConnection = createConnection();
        TopicSession createTopicSession = createConnection.createTopicSession(false, 1);
        TopicPublisher createPublisher = createTopicSession.createPublisher(createTopicSession.createTopic(TOPICNAME));
        createConnection.start();
        createPublisher.setDeliveryMode(2);
        long currentTimeMillis = System.currentTimeMillis();
        BytesMessage createBytesMessage = this.session2.createBytesMessage();
        for (int i = 1; i <= MSG_COUNT; i++) {
            createBytesMessage.setStringProperty("JMS_ID", "TEST");
            createBytesMessage.setIntProperty("Type", i);
            createPublisher.publish(createBytesMessage);
            if (i % 100 == 0) {
                LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                currentTimeMillis = System.currentTimeMillis();
            }
        }
    }

    private void activeateSubscribers() throws Exception {
        this.conn1 = createConnection();
        this.conn1.setClientID(SUBSCRIPTION1);
        this.session1 = this.conn1.createSession(true, 0);
        this.topic1 = this.session1.createTopic(TOPICNAME);
        this.consumer1 = this.session1.createDurableSubscriber(this.topic1, SUBSCRIPTION1, SELECTOR1, false);
        this.conn1.start();
        this.conn2 = createConnection();
        this.conn2.setClientID(SUBSCRIPTION2);
        this.session2 = this.conn2.createSession(true, 0);
        this.topic2 = this.session2.createTopic(TOPICNAME);
        this.consumer2 = this.session2.createDurableSubscriber(this.topic2, SUBSCRIPTION2, SELECTOR2, false);
        this.conn2.start();
    }

    @Test
    public void testInvalidPendingQueue() throws Exception {
        activeateSubscribers();
        Assert.assertNotNull(this.consumer1);
        Assert.assertNotNull(this.consumer2);
        produceMessages();
        LOG.debug("Sent messages to a single subscriber");
        Thread.sleep(2000L);
        LOG.debug("Closing durable subscriber connections");
        this.conn1.close();
        this.conn2.close();
        LOG.debug("Closed durable subscriber connections");
        Thread.sleep(2000L);
        LOG.debug("Re-starting durable subscriber connections");
        activeateSubscribers();
        LOG.debug("Started up durable subscriber connections - now view activemq console to see pending queue size on the other subscriber");
        for (ObjectName objectName : this.broker.getAdminView().getDurableTopicSubscribers()) {
            DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(objectName, DurableSubscriptionViewMBean.class, true);
            LOG.info(durableSubscriptionViewMBean.getSubscriptionName() + ": pending = " + durableSubscriptionViewMBean.getPendingQueueSize());
            if (durableSubscriptionViewMBean.getSubscriptionName().equals(SUBSCRIPTION1)) {
                Assert.assertEquals("Incorrect number of pending messages", 2500L, durableSubscriptionViewMBean.getPendingQueueSize());
            } else {
                Assert.assertEquals("Incorrect number of pending messages", 0L, durableSubscriptionViewMBean.getPendingQueueSize());
            }
        }
    }

    private TopicConnection createConnection() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(this.connectionUri);
        return activeMQConnectionFactory.createTopicConnection();
    }
}
