package org.apache.activemq.transport.stomp;

import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/stomp/StompLoadTest.class */
public class StompLoadTest {
    private static final Logger LOG = LoggerFactory.getLogger(StompLoadTest.class);
    private static final int TASK_COUNT = 100;
    private static final int MSG_COUNT = 250;
    private BrokerService broker;
    protected Connection connection;
    protected Session session;
    protected ActiveMQQueue queue;
    private ExecutorService executor;
    private CountDownLatch started;
    private CountDownLatch ready;
    private AtomicInteger receiveCount;
    protected String bindAddress = "stomp://localhost:61613";
    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
    protected String jmsUri = "vm://localhost";
    protected StompConnection stompConnection = new StompConnection();

    @Before
    public void setUp() throws Exception {
        this.broker = BrokerFactory.createBroker(new URI(this.confUri));
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        stompConnect();
        this.connection = new ActiveMQConnectionFactory(this.jmsUri).createConnection("system", "manager");
        this.session = this.connection.createSession(false, 1);
        this.queue = new ActiveMQQueue(getDestinationName());
        this.connection.start();
        this.executor = Executors.newFixedThreadPool(100, new ThreadFactory() { // from class: org.apache.activemq.transport.stomp.StompLoadTest.1
            private long i = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                this.i++;
                return new Thread(runnable, "Test Worker " + this.i);
            }
        });
        this.started = new CountDownLatch(100);
        this.ready = new CountDownLatch(1);
        this.receiveCount = new AtomicInteger(0);
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.executor.shutdownNow();
            this.connection.close();
            stompDisconnect();
            this.broker.stop();
            this.broker.waitUntilStopped();
        } catch (Exception e) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        } catch (Throwable th) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            throw th;
        }
    }

    @Test(timeout = 1200000000)
    public void testStompUnloadLoad() throws Exception {
        for (int i = 0; i < 100; i++) {
            this.executor.execute(new Runnable() { // from class: org.apache.activemq.transport.stomp.StompLoadTest.2
                @Override // java.lang.Runnable
                public void run() {
                    StompLoadTest.LOG.debug("Receive Thread Connecting to Broker.");
                    int i2 = 0;
                    StompConnection stompConnection = new StompConnection();
                    try {
                        StompLoadTest.this.stompConnect(stompConnection);
                    } catch (Exception e) {
                        StompLoadTest.LOG.error("Caught Exception while connecting: " + e.getMessage());
                    }
                    for (int i3 = 0; i3 < 10; i3++) {
                        try {
                            stompConnection.subscribe("/queue/test-" + i3, "auto");
                            stompConnection.subscribe("/topic/test-" + i3, "auto");
                        } catch (Exception e2) {
                            if (i2 != StompLoadTest.MSG_COUNT) {
                                StompLoadTest.LOG.warn("Receive task caught exception after receipt of [" + i2 + "] messages: " + e2.getMessage());
                                return;
                            }
                            return;
                        }
                    }
                    HashMap hashMap = new HashMap();
                    hashMap.put("activemq.prefetchSize", "1");
                    stompConnection.subscribe("/topic/" + StompLoadTest.this.getDestinationName(), "auto", hashMap);
                    StompLoadTest.this.ready.await();
                    TimeUnit.SECONDS.sleep(3L);
                    StompLoadTest.this.started.countDown();
                    while (true) {
                        StompFrame receive = stompConnection.receive(TimeUnit.SECONDS.toMillis(60L));
                        Assert.assertNotNull(receive);
                        i2++;
                        if ((StompLoadTest.LOG.isDebugEnabled() && i2 % 50 == 0) || i2 == StompLoadTest.MSG_COUNT) {
                            StompLoadTest.LOG.debug("Receiver thread got message: " + ((String) receive.getHeaders().get("message-id")));
                        }
                        StompLoadTest.this.receiveCount.incrementAndGet();
                    }
                }
            });
        }
        this.ready.countDown();
        Assert.assertTrue("Timed out waiting for receivers to start.", this.started.await(5L, TimeUnit.MINUTES));
        TimeUnit.SECONDS.sleep(5L);
        for (int i2 = 0; i2 < MSG_COUNT; i2++) {
            this.stompConnection.sendFrame("SEND\n destination:/topic/" + getDestinationName() + "\nid:" + i2 + "\ncontent-length:5 \n\n\u0001\u0002��\u0004\u0005��");
        }
        LOG.info("All 250 message have been sent, awaiting receipt.");
        Assert.assertTrue("Should get [25000] message but was: " + this.receiveCount.get(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.stomp.StompLoadTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return StompLoadTest.this.receiveCount.get() == 25000;
            }
        }, TimeUnit.MINUTES.toMillis(10L)));
        LOG.info("Test Completed and all messages received, shutting down.");
        this.executor.shutdown();
        this.executor.awaitTermination(2L, TimeUnit.MINUTES);
        this.stompConnection.sendFrame("DISCONNECT\n\n\n��");
        LOG.info("Test Finished.");
    }

    protected void stompConnect() throws Exception {
        URI uri = new URI(this.bindAddress);
        LOG.debug("Attempting connection to: " + this.bindAddress);
        this.stompConnection.open(createSocket(uri));
        this.stompConnection.connect("system", "manager");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stompConnect(StompConnection stompConnection) throws Exception {
        URI uri = new URI(this.bindAddress);
        LOG.debug("Attempting connection to: " + this.bindAddress);
        stompConnection.open(createSocket(uri));
        stompConnection.connect("system", "manager");
    }

    protected Socket createSocket(URI uri) throws IOException {
        return new Socket("127.0.0.1", uri.getPort());
    }

    protected String getDestinationName() {
        return getClass().getName() + ".Tester";
    }

    protected void stompDisconnect() throws IOException {
        if (this.stompConnection != null) {
            this.stompConnection.close();
            this.stompConnection = null;
        }
    }
}
