package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Assert;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.class */
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
    private static final Log LOG = LogFactory.getLog(ExpiredMessagesWithNoConsumerTest.class);
    BrokerService broker;
    Connection connection;
    Session session;
    MessageProducer producer;
    public ActiveMQDestination destination = new ActiveMQQueue("test");
    public boolean optimizedDispatch = true;

    public static Test suite() {
        return suite(ExpiredMessagesWithNoConsumerTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    protected void createBrokerWithMemoryLimit() throws Exception {
        doCreateBroker(true);
    }

    protected void createBroker() throws Exception {
        doCreateBroker(false);
    }

    private void doCreateBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("localhost");
        this.broker.setUseJmx(true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector(NetworkedSyncTest.broker1URL);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(this.optimizedDispatch);
        policyEntry.setExpireMessagesPeriod(800L);
        policyEntry.setMaxExpirePageSize(800);
        if (z) {
            policyEntry.setDeadLetterStrategy((DeadLetterStrategy) null);
            policyEntry.setMemoryLimit(200000L);
        }
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    public void initCombosForTestExpiredMessagesWithNoConsumer() {
        addCombinationValues("optimizedDispatch", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testExpiredMessagesWithNoConsumer() throws Exception {
        createBrokerWithMemoryLimit();
        this.connection = new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL).createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(1000L);
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 2000) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                thread.join(1000L);
                return !thread.isAlive();
            }
        }));
        final DestinationViewMBean createView = createView(this.destination);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 2000 == createView.getExpiredCount();
            }
        });
        LOG.info("enqueue=" + createView.getEnqueueCount() + ", dequeue=" + createView.getDequeueCount() + ", inflight=" + createView.getInFlightCount() + ", expired= " + createView.getExpiredCount() + ", size= " + createView.getQueueSize());
        assertEquals("All sent have expired", 2000L, createView.getExpiredCount());
    }

    public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
        createBroker();
        this.connection = new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL).createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(4000L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MessageConsumer createConsumer = this.session.createConsumer(this.destination);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.4
            public void onMessage(Message message) {
                try {
                    ExpiredMessagesWithNoConsumerTest.LOG.info("Got my message: " + message);
                    countDownLatch.countDown();
                    countDownLatch2.await(60L, TimeUnit.SECONDS);
                    ExpiredMessagesWithNoConsumerTest.LOG.info("acking message: " + message);
                    message.acknowledge();
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.toString());
                }
            }
        });
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 1500) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("got one message", countDownLatch.await(20L, TimeUnit.SECONDS));
        assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.6
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                thread.join(1000L);
                return !thread.isAlive();
            }
        }));
        final DestinationViewMBean createView = createView(this.destination);
        assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.7
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 1000 == createView.getDispatchCount();
            }
        }));
        assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.8
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 1500 == createView.getExpiredCount();
            }
        }));
        LOG.info("enqueue=" + createView.getEnqueueCount() + ", dequeue=" + createView.getDequeueCount() + ", inflight=" + createView.getInFlightCount() + ", expired= " + createView.getExpiredCount() + ", size= " + createView.getQueueSize());
        countDownLatch2.countDown();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.9
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 0 == createView.getInFlightCount();
            }
        });
        LOG.info("enqueue=" + createView.getEnqueueCount() + ", dequeue=" + createView.getDequeueCount() + ", inflight=" + createView.getInFlightCount() + ", expired= " + createView.getExpiredCount() + ", size= " + createView.getQueueSize());
        assertEquals("prefetch gets back to 0 ", 0L, createView.getInFlightCount());
        assertEquals("size gets back to 0 ", 0L, createView.getQueueSize());
        assertEquals("dequeues match sent/expired ", 1500L, createView.getDequeueCount());
        createConsumer.close();
        LOG.info("done: " + getName());
    }

    protected DestinationViewMBean createView(ActiveMQDestination activeMQDestination) throws Exception {
        return (DestinationViewMBean) this.broker.getManagementContext().newProxyInstance(activeMQDestination.isQueue() ? new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Queue,Destination=test") : new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Topic,Destination=test"), DestinationViewMBean.class, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.connection.stop();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }
}
