package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usecases.DurableSubDelayedUnsubscribeTest;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/NetworkLoadTest.class */
public class NetworkLoadTest extends TestCase {
    private static final transient Logger LOG = LoggerFactory.getLogger(NetworkLoadTest.class);
    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "12"));
    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "5000"));
    protected static final int BROKER_COUNT = 4;
    protected static final int MESSAGE_SIZE = 2000;
    String groupId;
    private BrokerService[] brokers;
    private ForwardingClient[] forwardingClients;

    /* loaded from: input_file:org/apache/activemq/network/NetworkLoadTest$ForwardingClient.class */
    class ForwardingClient {
        private final AtomicLong forwardCounter = new AtomicLong();
        private final Connection toConnection;
        private final Connection fromConnection;

        public ForwardingClient(int i, int i2) throws JMSException {
            this.toConnection = NetworkLoadTest.this.createConnection(i);
            final MessageProducer createProducer = this.toConnection.createSession(false, 1).createProducer(new ActiveMQQueue("Q" + i2));
            createProducer.setDeliveryMode(1);
            createProducer.setDisableMessageID(true);
            this.fromConnection = NetworkLoadTest.this.createConnection(i);
            this.fromConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("Q" + i)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.network.NetworkLoadTest.ForwardingClient.1
                public void onMessage(Message message) {
                    try {
                        createProducer.send(message);
                        ForwardingClient.this.forwardCounter.incrementAndGet();
                    } catch (JMSException e) {
                    }
                }
            });
        }

        public void start() throws JMSException {
            this.toConnection.start();
            this.fromConnection.start();
        }

        public void stop() throws JMSException {
            this.toConnection.stop();
            this.fromConnection.stop();
        }

        public void close() throws JMSException {
            this.toConnection.close();
            this.fromConnection.close();
        }
    }

    protected void setUp() throws Exception {
        this.groupId = "network-load-test-" + System.currentTimeMillis();
        this.brokers = new BrokerService[BROKER_COUNT];
        for (int i = 0; i < this.brokers.length; i++) {
            LOG.info("Starting broker: " + i);
            this.brokers[i] = createBroker(i);
            this.brokers[i].start();
        }
        Thread.sleep(800L);
        this.forwardingClients = new ForwardingClient[3];
        for (int i2 = 0; i2 < this.forwardingClients.length; i2++) {
            LOG.info("Starting fowarding client " + i2);
            this.forwardingClients[i2] = new ForwardingClient(i2, i2 + 1);
            this.forwardingClients[i2].start();
        }
    }

    protected void tearDown() throws Exception {
        for (int i = 0; i < this.forwardingClients.length; i++) {
            LOG.info("Stoping fowarding client " + i);
            this.forwardingClients[i].close();
        }
        for (int i2 = 0; i2 < this.brokers.length; i2++) {
            LOG.info("Stoping broker " + i2);
            this.brokers[i2].stop();
        }
    }

    protected Connection createConnection(int i) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (DurableSubDelayedUnsubscribeTest.Client.lifetime + i));
        activeMQConnectionFactory.setOptimizedMessageDispatch(true);
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseCompression(false);
        activeMQConnectionFactory.setDispatchAsync(true);
        activeMQConnectionFactory.setUseAsyncSend(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setQueuePrefetch(100);
        activeMQPrefetchPolicy.setTopicPrefetch(1000);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        return activeMQConnectionFactory.createConnection();
    }

    protected BrokerService createBroker(int i) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("broker-" + i);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);
        brokerService.getManagementContext().setCreateConnector(false);
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(52428800L);
        brokerService.setSystemUsage(systemUsage);
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setMemoryLimit(1048576L);
        policyEntry.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        arrayList.add(policyEntry);
        PolicyEntry policyEntry2 = new PolicyEntry();
        policyEntry2.setTopic(">");
        policyEntry2.setSubscriptionRecoveryPolicy(new NoSubscriptionRecoveryPolicy());
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:" + (DurableSubDelayedUnsubscribeTest.Client.lifetime + i)));
        transportConnector.setDiscoveryUri(new URI("multicast://default?group=" + this.groupId));
        brokerService.addConnector(transportConnector);
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector();
        discoveryNetworkConnector.setUri(new URI("multicast://default?group=" + this.groupId));
        discoveryNetworkConnector.setBridgeTempDestinations(true);
        discoveryNetworkConnector.setPrefetchSize(1);
        brokerService.addNetworkConnector(discoveryNetworkConnector);
        return brokerService;
    }

    public void testRequestReply() throws Exception {
        int length = this.brokers.length - 1;
        LOG.info("Staring Final Consumer");
        Connection createConnection = createConnection(length);
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("Q" + length));
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicLong atomicLong = new AtomicLong();
        final AtomicLong atomicLong2 = new AtomicLong();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.network.NetworkLoadTest.1
            public void onMessage(Message message) {
                ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
                ActiveMQTextMessage activeMQTextMessage2 = (ActiveMQTextMessage) atomicReference.get();
                if (activeMQTextMessage2 != null && activeMQTextMessage2.getMessageId().getProducerSequenceId() > activeMQTextMessage.getMessageId().getProducerSequenceId()) {
                    System.out.println("Received an out of order message. Got " + activeMQTextMessage.getMessageId() + ", expected something after " + activeMQTextMessage2.getMessageId());
                }
                atomicReference.set(activeMQTextMessage);
                atomicLong2.incrementAndGet();
            }
        });
        LOG.info("Staring Initial Producer");
        final Connection createConnection2 = createConnection(0);
        Thread thread = new Thread("Producer") { // from class: org.apache.activemq.network.NetworkLoadTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    createConnection2.start();
                    Session createSession = createConnection2.createSession(false, 1);
                    MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue("Q0"));
                    createProducer.setDeliveryMode(1);
                    createProducer.setDisableMessageID(true);
                    int i = 0;
                    while (!atomicBoolean.get()) {
                        createProducer.send(createSession.createTextMessage(createMessageText(i)));
                        atomicLong.incrementAndGet();
                        i++;
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            private String createMessageText(int i) {
                StringBuffer stringBuffer = new StringBuffer(2000);
                stringBuffer.append(i + " on " + new Date() + " ...");
                if (stringBuffer.length() > 2000) {
                    return stringBuffer.substring(0, 2000);
                }
                for (int length2 = stringBuffer.length(); length2 < 2000; length2++) {
                    stringBuffer.append(' ');
                }
                return stringBuffer.toString();
            }
        };
        thread.start();
        Thread.sleep(800L);
        for (int i = 0; i < SAMPLES; i++) {
            long currentTimeMillis = System.currentTimeMillis();
            atomicLong.set(0L);
            atomicLong2.set(0L);
            for (int i2 = 0; i2 < this.forwardingClients.length; i2++) {
                this.forwardingClients[i2].forwardCounter.set(0L);
            }
            Thread.sleep(SAMPLE_DURATION);
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = atomicLong2.get();
            long j2 = atomicLong.get();
            Logger logger = LOG;
            float f = (((float) j) * 1000.0f) / ((float) (currentTimeMillis2 - currentTimeMillis));
            logger.info("published: " + j2 + " msgs at " + logger + " msgs/sec, consumed: " + ((((float) j2) * 1000.0f) / ((float) (currentTimeMillis2 - currentTimeMillis))) + " msgs at " + j + " msgs/sec");
            StringBuffer stringBuffer = new StringBuffer(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
            stringBuffer.append("  forwarding counters: ");
            for (int i3 = 0; i3 < this.forwardingClients.length; i3++) {
                if (i3 != 0) {
                    stringBuffer.append(", ");
                }
                stringBuffer.append(this.forwardingClients[i3].forwardCounter.get());
            }
            LOG.info(stringBuffer.toString());
            assertTrue("Recieved some messages since last sample", j > 0);
            assertTrue("Produced some messages since last sample", j2 > 0);
        }
        LOG.info("Sample done.");
        atomicBoolean.set(true);
        thread.join(5000L);
        createConnection2.close();
        createConnection.close();
    }
}
