package org.apache.activemq.bugs;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.nio.NIOSSLLoadTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ3324Test.class */
public class AMQ3324Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
    private static final String bindAddress = "tcp://0.0.0.0:0";
    private BrokerService broker;
    private ActiveMQConnectionFactory cf;
    private static final int MESSAGE_COUNT = 100;

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        String publishableConnectString = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.cf = new ActiveMQConnectionFactory(publishableConnectString);
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        ActiveMQDestination createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
        ActiveMQTopic messageConsumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(createTemporaryQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(messageConsumedAdvisoryTopic);
        MessageProducer createProducer = createSession.createProducer(createTemporaryQueue);
        for (int i = 0; i < 100; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(new byte[1024]);
            createProducer.send(createBytesMessage);
        }
        Assert.assertNotNull(createConsumer.receive(5000L));
        Assert.assertNotNull(createConsumer2.receive(5000L));
        createConnection.close();
        LOG.debug("Connection closed, destinations should now become inactive.");
        Assert.assertTrue("The destination " + messageConsumedAdvisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3324Test.1
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return AMQ3324Test.this.broker.getAdminView().getTopics().length == 0;
            }
        }));
        Assert.assertTrue("The destination " + createTemporaryQueue + " was not removed. ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3324Test.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return AMQ3324Test.this.broker.getAdminView().getTemporaryQueues().length == 0;
            }
        }));
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseMirroredQueues(true);
        brokerService.setPersistent(false);
        brokerService.setSchedulePeriodForDestinationPurge(NIOSSLLoadTest.MESSAGE_COUNT);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setGcInactiveDestinations(true);
        policyEntry.setInactiveTimoutBeforeGC(2000L);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setAdvisoryForConsumed(true);
        policyEntry.setAdvisdoryForFastProducers(true);
        policyEntry.setAdvisoryForDelivery(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        DestinationInterceptor mirroredQueue = new MirroredQueue();
        mirroredQueue.setCopyMessage(true);
        brokerService.setDestinationInterceptors(new DestinationInterceptor[]{mirroredQueue});
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector(bindAddress);
        return brokerService;
    }
}
