package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueReceiver;
import jakarta.jms.QueueSession;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4485LowLimitTest.class */
public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
    static final String payload = new String(new byte[10240]);
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
    final int portBase = 61600;
    int numBrokers = 8;
    final int numProducers = 30;
    final int numMessages = 1000;
    final int consumerSleepTime = 40;
    StringBuilder brokersUrl = new StringBuilder();
    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<>();
    private ArrayList<Throwable> exceptions = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4485LowLimitTest$ConsumerState.class */
    public class ConsumerState {
        AtomicInteger accumulator;
        String brokerName;
        QueueReceiver receiver;
        ActiveMQDestination destination;
        ConcurrentLinkedQueue<Integer> expected = new ConcurrentLinkedQueue<>();

        ConsumerState() {
        }
    }

    protected void buildUrlList() throws Exception {
        for (int i = 0; i < this.numBrokers; i++) {
            this.brokersUrl.append("tcp://localhost:" + (61600 + i));
            if (i != this.numBrokers - 1) {
                this.brokersUrl.append(',');
            }
        }
    }

    protected BrokerService createBroker(int i) throws Exception {
        return createBroker(i, true);
    }

    protected BrokerService createBroker(int i, boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setUseJmx(true);
        brokerService.setBrokerName("B" + i);
        brokerService.addConnector(new URI("tcp://localhost:" + (61600 + i)));
        if (z) {
            addNetworkConnector(brokerService);
        }
        brokerService.setSchedulePeriodForDestinationPurge(0);
        brokerService.getSystemUsage().getMemoryUsage().setLimit(268435456L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setQueuePrefetch(1000);
        policyEntry.setMemoryLimit(2097152L);
        policyEntry.setProducerFlowControl(false);
        policyEntry.setEnableAudit(true);
        policyEntry.setUseCache(true);
        policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
        PolicyEntry policyEntry2 = new PolicyEntry();
        policyEntry2.setExpireMessagesPeriod(0L);
        policyEntry2.setQueuePrefetch(1000);
        policyEntry2.setMemoryLimit(5242880L);
        policyEntry2.setProducerFlowControl(true);
        policyEntry2.setEnableAudit(true);
        policyEntry2.setUseCache(true);
        policyMap.put(new ActiveMQQueue("IN"), policyEntry2);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.getPersistenceAdapter().setConcurrentStoreAndDispatchQueues(true);
        this.brokers.put(brokerService.getBrokerName(), new JmsMultipleBrokersTestSupport.BrokerItem(brokerService));
        return brokerService;
    }

    private void addNetworkConnector(BrokerService brokerService) throws Exception {
        StringBuilder append = new StringBuilder("static:(").append(this.brokersUrl.toString());
        append.append(')');
        for (int i = 0; i < 2; i++) {
            DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI(append.toString()));
            discoveryNetworkConnector.setName("Bridge-" + i);
            discoveryNetworkConnector.setNetworkTTL(1);
            discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(true);
            discoveryNetworkConnector.setDynamicOnly(true);
            discoveryNetworkConnector.setPrefetchSize(100);
            discoveryNetworkConnector.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQQueue("GW.*")));
            brokerService.addNetworkConnector(discoveryNetworkConnector);
        }
    }

    public void x_testInterleavedSend() throws Exception {
        createBroker(0, false).start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61600");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        QueueConnection createQueueConnection = activeMQConnectionFactory.createQueueConnection();
        QueueConnection createQueueConnection2 = activeMQConnectionFactory.createQueueConnection();
        QueueConnection createQueueConnection3 = activeMQConnectionFactory.createQueueConnection();
        createQueueConnection.start();
        createQueueConnection2.start();
        createQueueConnection3.start();
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("IN");
        final QueueSession createQueueSession = createQueueConnection.createQueueSession(true, 0);
        TextMessage createTextMessage = createQueueSession.createTextMessage("TX");
        final TextMessage createTextMessage2 = createQueueSession.createTextMessage("NO_TX");
        MessageProducer createProducer = createQueueSession.createProducer(activeMQQueue);
        final MessageProducer createProducer2 = createQueueConnection2.createQueueSession(false, 1).createProducer(activeMQQueue);
        createProducer.send(createTextMessage);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createQueueSession.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createProducer2.send(createTextMessage2);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(10L, TimeUnit.MINUTES);
    }

    public void testBrokers() throws Exception {
        buildUrlList();
        for (int i = 0; i < this.numBrokers; i++) {
            createBroker(i);
        }
        startAllBrokers();
        waitForBridgeFormation(this.numBrokers - 1);
        verifyPeerBrokerInfos(this.numBrokers - 1);
        final List<ConsumerState> startAllGWConsumers = startAllGWConsumers(this.numBrokers);
        startAllGWFanoutConsumers(this.numBrokers);
        LOG.info("Waiting for percolation of consumers..");
        TimeUnit.SECONDS.sleep(5L);
        LOG.info("Produce mesages..");
        long currentTimeMillis = System.currentTimeMillis();
        produce(1000);
        assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.3
            public boolean isSatisified() throws Exception {
                for (ConsumerState consumerState : startAllGWConsumers) {
                    int length = 1000 * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1);
                    AMQ4485LowLimitTest.LOG.info("Tally for: " + consumerState.brokerName + ", dest: " + consumerState.destination + " - " + consumerState.accumulator.get());
                    if (consumerState.accumulator.get() != length) {
                        AMQ4485LowLimitTest.LOG.info("Tally for: " + consumerState.brokerName + ", dest: " + consumerState.destination + " - " + consumerState.accumulator.get() + " != " + length + ", " + consumerState.expected);
                        if (consumerState.accumulator.get() > length - 50) {
                            AMQ4485LowLimitTest.this.dumpQueueStat(null);
                        }
                        if (consumerState.expected.size() != 1) {
                            return false;
                        }
                        AMQ4485LowLimitTest.this.startConsumer(consumerState.brokerName, consumerState.destination);
                        return false;
                    }
                    AMQ4485LowLimitTest.LOG.info("got tally on " + consumerState.brokerName);
                }
                return true;
            }
        }, 60000000L, 20000L));
        assertTrue("No exceptions:" + this.exceptions, this.exceptions.isEmpty());
        LOG.info("done");
        LOG.info("Duration:" + TimeUtils.printDuration(System.currentTimeMillis() - currentTimeMillis));
        assertEquals("nothing in the dlq's", 0L, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ")));
    }

    private void startConsumer(String str, ActiveMQDestination activeMQDestination) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (61600 + Integer.parseInt(str.substring(1))));
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        QueueConnection createQueueConnection = activeMQConnectionFactory.createQueueConnection();
        createQueueConnection.start();
        createQueueConnection.createQueueSession(false, 1).createConsumer(activeMQDestination);
        createQueueConnection.close();
    }

    private long dumpQueueStat(ActiveMQDestination activeMQDestination) throws Exception {
        long j = 0;
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            BrokerService brokerService = it.next().broker;
            for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
                if (activeMQDestination != null && objectName.toString().contains(activeMQDestination.getPhysicalName())) {
                    QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
                    Logger logger = LOG;
                    String brokerName = brokerService.getBrokerName();
                    String name = queueViewMBean.getName();
                    long enqueueCount = queueViewMBean.getEnqueueCount();
                    queueViewMBean.getQueueSize();
                    logger.info(brokerName + ", " + name + ", Enqueue:" + enqueueCount + ", Size: " + logger);
                    j += queueViewMBean.getQueueSize();
                }
            }
        }
        return j;
    }

    private void startAllGWFanoutConsumers(int i) throws Exception {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i2 = 0; i2 < i; i2++) {
            stringBuffer.append("GW." + i2);
            if (i2 + 1 != i) {
                stringBuffer.append(',');
            }
        }
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(stringBuffer.toString());
        for (int i3 = 0; i3 < i; i3++) {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + i3) + ")");
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            QueueConnection createQueueConnection = activeMQConnectionFactory.createQueueConnection();
            createQueueConnection.start();
            final QueueSession createQueueSession = createQueueConnection.createQueueSession(true, 0);
            final MessageProducer createProducer = createQueueSession.createProducer(activeMQQueue);
            createQueueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.4
                public void onMessage(Message message) {
                    try {
                        createProducer.send(message);
                        createQueueSession.commit();
                    } catch (Exception e) {
                        AMQ4485LowLimitTest.LOG.error("Failed to fanout to GW: " + message, e);
                    }
                }
            });
        }
    }

    private List<ConsumerState> startAllGWConsumers(int i) throws Exception {
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < i; i2++) {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + i2) + ")");
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            ActiveMQConnection createQueueConnection = activeMQConnectionFactory.createQueueConnection();
            createQueueConnection.start();
            QueueSession createQueueSession = createQueueConnection.createQueueSession(false, 1);
            ActiveMQQueue activeMQQueue = new ActiveMQQueue("GW." + i2);
            QueueReceiver createReceiver = createQueueSession.createReceiver(activeMQQueue);
            final ConsumerState consumerState = new ConsumerState();
            consumerState.brokerName = createQueueConnection.getBrokerName();
            consumerState.receiver = createReceiver;
            consumerState.destination = activeMQQueue;
            int i3 = 0;
            while (true) {
                if (i3 >= 1000 * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1)) {
                    break;
                }
                consumerState.expected.add(Integer.valueOf(i3));
                i3++;
            }
            if (!this.accumulators.containsKey(activeMQQueue)) {
                this.accumulators.put(activeMQQueue, new AtomicInteger(0));
            }
            consumerState.accumulator = this.accumulators.get(activeMQQueue);
            createReceiver.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.5
                public void onMessage(Message message) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(40L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        consumerState.accumulator.incrementAndGet();
                        try {
                            consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    } catch (Exception e3) {
                        AMQ4485LowLimitTest.LOG.error("Failed to commit slow receipt of " + message, e3);
                    }
                }
            });
            linkedList.add(consumerState);
        }
        return linkedList;
    }

    private void produce(final int i) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(30);
        final AtomicInteger atomicInteger = new AtomicInteger(i);
        for (int i2 = 1; i2 <= 30; i2++) {
            final int i3 = i2 % this.numBrokers;
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.6
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (61600 + i3) + ")");
                        activeMQConnectionFactory.setWatchTopicAdvisories(false);
                        ActiveMQConnection createQueueConnection = activeMQConnectionFactory.createQueueConnection();
                        createQueueConnection.start();
                        QueueSession createQueueSession = createQueueConnection.createQueueSession(false, 1);
                        MessageProducer createProducer = createQueueSession.createProducer((Destination) null);
                        while (true) {
                            int decrementAndGet = atomicInteger.decrementAndGet();
                            if (decrementAndGet < 0) {
                                createQueueConnection.close();
                                return;
                            }
                            int i4 = (i - decrementAndGet) - 1;
                            ActiveMQQueue activeMQQueue = new ActiveMQQueue("IN");
                            TextMessage createTextMessage = createQueueSession.createTextMessage(createQueueConnection.getBrokerName() + "->" + i4 + " payload:" + AMQ4485LowLimitTest.payload);
                            createTextMessage.setIntProperty("NUM", i4);
                            createProducer.send(activeMQQueue, createTextMessage);
                        }
                    } catch (Throwable th) {
                        th.printStackTrace();
                        AMQ4485LowLimitTest.this.exceptions.add(th);
                    }
                }
            });
        }
    }

    private void verifyPeerBrokerInfo(JmsMultipleBrokersTestSupport.BrokerItem brokerItem, final int i) throws Exception {
        final BrokerService brokerService = brokerItem.broker;
        final RegionBroker regionBroker = brokerService.getRegionBroker();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ4485LowLimitTest.7
            public boolean isSatisified() throws Exception {
                AMQ4485LowLimitTest.LOG.info("verify infos " + brokerService.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return i == regionBroker.getPeerBrokerInfos().length;
            }
        });
        LOG.info("verify infos " + brokerService.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add("B" + i2);
        }
        if (i != regionBroker.getPeerBrokerInfos().length) {
            for (BrokerInfo brokerInfo : regionBroker.getPeerBrokerInfos()) {
                LOG.info(brokerInfo.getBrokerName());
                arrayList.remove(brokerInfo.getBrokerName());
            }
            LOG.info("Broker infos off.." + arrayList);
        }
        assertEquals(brokerService.getBrokerName(), i, regionBroker.getPeerBrokerInfos().length);
    }

    private void verifyPeerBrokerInfos(int i) throws Exception {
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            verifyPeerBrokerInfo(it.next(), i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
    }
}
