package org.apache.activemq.bugs;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.transport.nio.NIOSSLConcurrencyTest;
import org.apache.activemq.usecases.DurableSubDelayedUnsubscribeTest;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.usecases.VerifyNetworkConsumersDisconnectTest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4221Test.class */
public class AMQ4221Test extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class);
    BrokerService brokerService;
    private String brokerUrlString;
    public int PAYLOAD_SIZE_BYTES = NIOSSLConcurrencyTest.MESSAGE_SIZE;
    public int NUM_TO_SEND = DurableSubDelayedUnsubscribeTest.Client.lifetime;
    public int NUM_CONCURRENT_PRODUCERS = 20;
    public int QUEUE_COUNT = 1;
    public int TMP_JOURNAL_MAX_FILE_SIZE = 10485760;
    public int DLQ_PURGE_INTERVAL = VerifyNetworkConsumersDisconnectTest.TIMEOUT;
    public int MESSAGE_TIME_TO_LIVE = AMQ4607Test.TIMEOUT;
    public int EXPIRE_SWEEP_PERIOD = 200;
    public int TMP_JOURNAL_GC_PERIOD = 50;
    public int RECEIVE_POLL_PERIOD = 4000;
    private int RECEIVE_BATCH = 5000;
    final byte[] payload = new byte[this.PAYLOAD_SIZE_BYTES];
    final AtomicInteger counter = new AtomicInteger(0);
    final HashSet<Throwable> exceptions = new HashSet<>();
    ExecutorService executorService = Executors.newCachedThreadPool();
    final AtomicBoolean done = new AtomicBoolean(false);
    final LinkedList<String> errorsInLog = new LinkedList<>();

    public static Test suite() {
        return suite(AMQ4221Test.class);
    }

    public void setUp() throws Exception {
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger());
        Appender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.bugs.AMQ4221Test.1
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.bugs.AMQ4221Test.2
            public void append(LogEvent logEvent) {
                if (logEvent.getLevel().isMoreSpecificThan(Level.WARN)) {
                    System.err.println("Fail on error in log: " + logEvent.getMessage());
                    AMQ4221Test.this.done.set(true);
                    AMQ4221Test.this.errorsInLog.add(logEvent.getMessage().getFormattedMessage());
                }
            }
        };
        appender.start();
        logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.bugs.AMQ4221Test.3
        });
        logger.addAppender(appender);
        this.done.set(false);
        this.errorsInLog.clear();
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")});
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
        policyEntry.setExpireMessagesPeriod(this.EXPIRE_SWEEP_PERIOD);
        policyEntry.setProducerFlowControl(false);
        policyEntry.setMemoryLimit(52428800L);
        this.brokerService.getSystemUsage().getMemoryUsage().setLimit(52428800L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        PListStoreImpl pListStoreImpl = new PListStoreImpl();
        pListStoreImpl.setDirectory(this.brokerService.getTmpDataDirectory());
        pListStoreImpl.setJournalMaxFileLength(this.TMP_JOURNAL_MAX_FILE_SIZE);
        pListStoreImpl.setCleanupInterval(this.TMP_JOURNAL_GC_PERIOD);
        pListStoreImpl.setIndexPageSize(200);
        pListStoreImpl.setIndexEnablePageCaching(false);
        this.brokerService.setTempDataStore(pListStoreImpl);
        this.brokerService.setAdvisorySupport(false);
        TransportConnector addConnector = this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        this.brokerUrlString = addConnector.getPublishableConnectString();
    }

    public void tearDown() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        this.executorService.shutdownNow();
    }

    public void testProduceConsumeExpireHalf() throws Exception {
        final Queue destination = getDestination(this.brokerService, new ActiveMQQueue("ActiveMQ.DLQ"));
        if (this.DLQ_PURGE_INTERVAL > 0) {
            this.executorService.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4221Test.4
                @Override // java.lang.Runnable
                public void run() {
                    while (!AMQ4221Test.this.done.get()) {
                        try {
                            Thread.sleep(AMQ4221Test.this.DLQ_PURGE_INTERVAL);
                            AMQ4221Test.LOG.info("Purge DLQ, current size: " + destination.getDestinationStatistics().getMessages().getCount());
                            destination.purge();
                        } catch (InterruptedException e) {
                        } catch (Throwable th) {
                            th.printStackTrace();
                            AMQ4221Test.this.exceptions.add(th);
                        }
                    }
                }
            });
        }
        final CountDownLatch countDownLatch = new CountDownLatch(this.QUEUE_COUNT);
        for (int i = 0; i < this.QUEUE_COUNT; i++) {
            final int i2 = i;
            this.executorService.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4221Test.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AMQ4221Test.this.doProduceConsumeExpireHalf(i2, countDownLatch);
                    } catch (Throwable th) {
                        th.printStackTrace();
                        AMQ4221Test.this.exceptions.add(th);
                    }
                }
            });
        }
        while (!this.done.get()) {
            this.done.set(countDownLatch.await(5L, TimeUnit.SECONDS));
        }
        this.executorService.shutdown();
        this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
        assertTrue("no exceptions:" + this.exceptions, this.exceptions.isEmpty());
        assertTrue("No ERROR in log:" + this.errorsInLog, this.errorsInLog.isEmpty());
    }

    public void doProduceConsumeExpireHalf(int i, CountDownLatch countDownLatch) throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("Q" + i);
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokerUrlString);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(activeMQQueue, "on = 'true'");
        this.executorService.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4221Test.6
            @Override // java.lang.Runnable
            public void run() {
                while (!AMQ4221Test.this.done.get()) {
                    try {
                        Thread.sleep(AMQ4221Test.this.RECEIVE_POLL_PERIOD);
                        for (int i2 = 0; i2 < AMQ4221Test.this.RECEIVE_BATCH && !AMQ4221Test.this.done.get(); i2++) {
                            Message receive = createConsumer.receive(1000L);
                            if (receive != null) {
                                AMQ4221Test.this.counter.incrementAndGet();
                                if (AMQ4221Test.this.counter.get() > 0 && AMQ4221Test.this.counter.get() % DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP == 0) {
                                    AMQ4221Test.LOG.info("received: " + AMQ4221Test.this.counter.get() + ", " + receive.getJMSDestination().toString());
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        AMQ4221Test.this.exceptions.add(e);
                        return;
                    } catch (JMSException e2) {
                        return;
                    }
                }
            }
        });
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch2 = new CountDownLatch(this.NUM_CONCURRENT_PRODUCERS);
        for (int i2 = 0; i2 < this.NUM_CONCURRENT_PRODUCERS; i2++) {
            this.executorService.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4221Test.7
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection2 = activeMQConnectionFactory.createConnection();
                        createConnection2.start();
                        Session createSession = createConnection2.createSession(false, 1);
                        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
                        createProducer.setTimeToLive(AMQ4221Test.this.MESSAGE_TIME_TO_LIVE);
                        createProducer.setDeliveryMode(1);
                        while (atomicInteger.incrementAndGet() < AMQ4221Test.this.NUM_TO_SEND && !AMQ4221Test.this.done.get()) {
                            BytesMessage createBytesMessage = createSession.createBytesMessage();
                            createBytesMessage.writeBytes(AMQ4221Test.this.payload);
                            createBytesMessage.setStringProperty("on", String.valueOf(atomicInteger.get() % 2 == 0));
                            createProducer.send(createBytesMessage);
                        }
                        countDownLatch2.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                        AMQ4221Test.this.exceptions.add(e);
                    }
                }
            });
        }
        countDownLatch2.await(10L, TimeUnit.MINUTES);
        DestinationStatistics destinationStatistics = getDestinationStatistics(this.brokerService, activeMQQueue);
        Logger logger = LOG;
        long count = destinationStatistics.getExpired().getCount();
        activeMQQueue.getQueueName();
        logger.info("total expired so far " + count + ", " + logger);
        countDownLatch.countDown();
    }
}
