package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.usecases.DurableSubDelayedUnsubscribeTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.class */
public class RequestReplyToTopicViaThreeNetworkHopsTest {
    protected static final int CONCURRENT_CLIENT_COUNT = 5;
    protected static final int CONCURRENT_SERVER_COUNT = 5;
    protected static final int TOTAL_CLIENT_ITER = 10;
    protected static int Next_broker_num = 0;
    protected static Log LOG = LogFactory.getLog(RequestReplyToTopicViaThreeNetworkHopsTest.class);
    protected boolean testError = false;
    protected boolean fatalTestError = false;
    protected int echoResponseFill = 0;
    public boolean duplex = true;
    protected EmbeddedTcpBroker edge1 = new EmbeddedTcpBroker("edge", 1);
    protected EmbeddedTcpBroker edge2 = new EmbeddedTcpBroker("edge", 2);
    protected EmbeddedTcpBroker core1 = new EmbeddedTcpBroker("core", 1);
    protected EmbeddedTcpBroker core2 = new EmbeddedTcpBroker("core", 2);

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest$EchoRequestProcessor.class */
    protected class EchoRequestProcessor implements Runnable {
        protected Session session;
        protected Destination resp_dest;
        protected MessageProducer msg_prod;
        protected Message request;

        public EchoRequestProcessor(Session session, Message message) throws Exception {
            this.session = session;
            this.request = message;
            this.resp_dest = message.getJMSReplyTo();
            if (this.resp_dest == null) {
                throw new Exception("invalid request: no reply-to destination given");
            }
            this.msg_prod = this.session.createProducer(this.resp_dest);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                processRequest(this.request);
            } catch (Exception e) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("Failed to process request", e);
            }
        }

        protected void processRequest(Message message) throws Exception {
            if (RequestReplyToTopicViaThreeNetworkHopsTest.LOG.isDebugEnabled()) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.debug("ECHO request message " + message.toString());
            }
            this.resp_dest = message.getJMSReplyTo();
            if (this.resp_dest == null) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("invalid request: no reply-to destination given");
                return;
            }
            this.msg_prod = this.session.createProducer(this.resp_dest);
            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.debug("SENDING ECHO RESPONSE to:" + this.resp_dest);
            this.msg_prod.send(message);
            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.debug(this.session.getConnection().getBrokerName() + " SENT ECHO RESPONSE to " + this.resp_dest);
            this.msg_prod.close();
            this.msg_prod = null;
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest$EchoService.class */
    protected class EchoService extends Thread {
        protected String destName;
        protected Connection jmsConn;
        protected Session sess;
        protected MessageConsumer msg_cons;
        protected boolean Shutdown_ind;
        protected Destination req_dest;
        protected CountDownLatch waitShutdown;
        protected ThreadPoolExecutor processorPool;

        public EchoService(String str, Connection connection) throws Exception {
            this.destName = str;
            this.jmsConn = connection;
            this.Shutdown_ind = false;
            this.sess = this.jmsConn.createSession(false, 1);
            this.req_dest = this.sess.createQueue(this.destName);
            this.msg_cons = this.sess.createConsumer(this.req_dest);
            this.jmsConn.start();
            this.waitShutdown = new CountDownLatch(1);
            this.processorPool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(10000));
        }

        public EchoService(RequestReplyToTopicViaThreeNetworkHopsTest requestReplyToTopicViaThreeNetworkHopsTest, String str, String str2) throws Exception {
            this(str, (Connection) ActiveMQConnection.makeConnection(str2));
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("STARTING ECHO SERVICE");
                    while (!this.Shutdown_ind) {
                        Message receive = this.msg_cons.receive(100L);
                        if (receive != null) {
                            this.processorPool.execute(new EchoRequestProcessor(this.sess, receive));
                        }
                    }
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("shutting down test echo service");
                    try {
                        this.jmsConn.stop();
                    } catch (JMSException e) {
                        RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("error on shutting down JMS connection", e);
                    }
                    synchronized (this) {
                        this.waitShutdown.countDown();
                    }
                } catch (Exception e2) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("error processing echo service requests", e2);
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("shutting down test echo service");
                    try {
                        this.jmsConn.stop();
                    } catch (JMSException e3) {
                        RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("error on shutting down JMS connection", e3);
                    }
                    synchronized (this) {
                        this.waitShutdown.countDown();
                    }
                }
            } catch (Throwable th) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("shutting down test echo service");
                try {
                    this.jmsConn.stop();
                } catch (JMSException e4) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("error on shutting down JMS connection", e4);
                }
                synchronized (this) {
                    this.waitShutdown.countDown();
                    throw th;
                }
            }
        }

        public void shutdown() {
            CountDownLatch countDownLatch;
            synchronized (this) {
                countDownLatch = this.waitShutdown;
            }
            this.Shutdown_ind = true;
            try {
                if (countDownLatch == null) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("echo service shutdown: service does not appear to be active");
                } else if (countDownLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("echo service shutdown complete");
                } else {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("timeout waiting for echo service shutdown");
                }
            } catch (InterruptedException e) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("interrupted while waiting for echo service shutdown");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest$EmbeddedTcpBroker.class */
    public class EmbeddedTcpBroker {
        protected BrokerService brokerSvc = new BrokerService();
        protected int brokerNum;
        protected String brokerName;
        protected String brokerId;
        protected int port;
        protected String tcpUrl;
        protected String fullUrl;

        public EmbeddedTcpBroker(String str, int i) throws Exception {
            synchronized (getClass()) {
                this.brokerNum = RequestReplyToTopicViaThreeNetworkHopsTest.Next_broker_num;
                RequestReplyToTopicViaThreeNetworkHopsTest.Next_broker_num++;
            }
            this.brokerName = str + i;
            this.brokerId = this.brokerName;
            this.brokerSvc.setBrokerName(this.brokerName);
            this.brokerSvc.setBrokerId(this.brokerId);
            this.brokerSvc.setPersistent(false);
            this.brokerSvc.setUseJmx(false);
            this.port = DurableSubDelayedUnsubscribeTest.Client.lifetime + (this.brokerNum * 10);
            this.tcpUrl = "tcp://127.0.0.1:" + Integer.toString(this.port);
            this.fullUrl = this.tcpUrl + "?jms.watchTopicAdvisories=false";
            this.brokerSvc.addConnector(this.tcpUrl);
        }

        public Connection createConnection() throws URISyntaxException, JMSException {
            return ActiveMQConnection.makeConnection(this.fullUrl);
        }

        public String getConnectionUrl() {
            return this.fullUrl;
        }

        public void coreConnectTo(EmbeddedTcpBroker embeddedTcpBroker, boolean z) throws Exception {
            makeConnectionTo(embeddedTcpBroker, z, true);
            makeConnectionTo(embeddedTcpBroker, z, false);
            if (z) {
                return;
            }
            embeddedTcpBroker.makeConnectionTo(this, z, true);
            embeddedTcpBroker.makeConnectionTo(this, z, false);
        }

        public void start() throws Exception {
            this.brokerSvc.start();
            this.brokerSvc.waitUntilStarted();
        }

        public void stop() throws Exception {
            this.brokerSvc.stop();
        }

        protected void makeConnectionTo(EmbeddedTcpBroker embeddedTcpBroker, boolean z, boolean z2) throws Exception {
            Object obj;
            ActiveMQDestination createDestination;
            DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + embeddedTcpBroker.tcpUrl + ")"));
            discoveryNetworkConnector.setDuplex(z);
            if (z2) {
                discoveryNetworkConnector.setConduitSubscriptions(false);
            } else {
                discoveryNetworkConnector.setConduitSubscriptions(true);
            }
            discoveryNetworkConnector.setNetworkTTL(3);
            discoveryNetworkConnector.setSuppressDuplicateQueueSubscriptions(true);
            discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(true);
            discoveryNetworkConnector.setBridgeTempDestinations(z2);
            if (z2) {
                obj = "queue";
                createDestination = ActiveMQDestination.createDestination(">", (byte) 2);
            } else {
                obj = "topic";
                createDestination = ActiveMQDestination.createDestination(">", (byte) 1);
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(createDestination);
            discoveryNetworkConnector.setExcludedDestinations(arrayList);
            if (z) {
                discoveryNetworkConnector.setName(this.brokerId + "<-" + obj + "->" + embeddedTcpBroker.brokerId);
            } else {
                discoveryNetworkConnector.setName(this.brokerId + "-" + obj + "->" + embeddedTcpBroker.brokerId);
            }
            this.brokerSvc.addNetworkConnector(discoveryNetworkConnector);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest$MessageClient.class */
    public class MessageClient extends Thread {
        protected MessageConsumer msgCons;
        protected boolean shutdownInd;
        protected int expectedCount;
        protected boolean haveFirstSeq;
        protected int lastSeq = 0;
        protected int msgCount = 0;
        protected CountDownLatch shutdownLatch = new CountDownLatch(1);

        public MessageClient(MessageConsumer messageConsumer, int i) {
            this.msgCons = messageConsumer;
            this.expectedCount = i * (RequestReplyToTopicViaThreeNetworkHopsTest.this.echoResponseFill + 1);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            CountDownLatch countDownLatch;
            try {
                synchronized (this) {
                    countDownLatch = this.shutdownLatch;
                }
                this.shutdownInd = false;
                processMessages();
                countDownLatch.countDown();
            } catch (Exception e) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("message client error", e);
            }
        }

        public void waitShutdown(long j) {
            CountDownLatch countDownLatch;
            try {
                synchronized (this) {
                    countDownLatch = this.shutdownLatch;
                }
                if (countDownLatch != null) {
                    countDownLatch.await(j, TimeUnit.MILLISECONDS);
                } else {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("echo client shutdown: client does not appear to be active");
                }
            } catch (InterruptedException e) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("wait for message client shutdown interrupted", e);
            }
        }

        public boolean shutdown() {
            boolean z;
            if (!this.shutdownInd) {
                this.shutdownInd = true;
            }
            waitShutdown(200L);
            synchronized (this) {
                z = this.shutdownLatch == null || this.shutdownLatch.getCount() == 0;
            }
            return z;
        }

        public int getNumMsgReceived() {
            return this.msgCount;
        }

        protected void processMessages() throws Exception {
            this.haveFirstSeq = false;
            while (!this.shutdownInd && !RequestReplyToTopicViaThreeNetworkHopsTest.this.fatalTestError) {
                Message receive = this.msgCons.receive(100L);
                if (receive != null) {
                    this.msgCount++;
                    checkMessage(receive);
                }
            }
            this.msgCons.close();
        }

        protected void checkMessage(Message message) throws Exception {
            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.debug("received message " + RequestReplyToTopicViaThreeNetworkHopsTest.fmtMsgInfo(message) + " from " + message.getJMSDestination());
            if (message.propertyExists("SEQ")) {
                int intProperty = message.getIntProperty("SEQ");
                if (this.haveFirstSeq && intProperty != this.lastSeq + 1) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(this.lastSeq + 1) + " but have " + Integer.toString(intProperty));
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                }
                this.lastSeq = intProperty;
                if (this.msgCount > this.expectedCount) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("*** have more messages than expected; have " + this.msgCount + "; expect " + this.expectedCount);
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                }
            }
            if (message.propertyExists("end-of-response")) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.trace("received end-of-response message");
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest$TopicTrafficGenerator.class */
    protected class TopicTrafficGenerator extends Thread {
        protected Connection conn1;
        protected Connection conn2;
        protected Session sess1;
        protected Session sess2;
        protected Destination dest;
        protected MessageProducer prod;
        protected MessageConsumer cons;
        protected boolean Shutdown_ind;
        protected int send_count;

        public TopicTrafficGenerator(String str, String str2) throws Exception {
            this.conn1 = RequestReplyToTopicViaThreeNetworkHopsTest.this.createConnection(str);
            this.conn2 = RequestReplyToTopicViaThreeNetworkHopsTest.this.createConnection(str2);
            this.sess1 = this.conn1.createSession(false, 1);
            this.sess2 = this.conn2.createSession(false, 1);
            this.conn1.start();
            this.conn2.start();
            this.dest = this.sess1.createTopic("traffic");
            this.prod = this.sess1.createProducer(this.dest);
            this.dest = this.sess2.createTopic("traffic");
            this.cons = this.sess2.createConsumer(this.dest);
        }

        public void shutdown() {
            this.Shutdown_ind = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("Starting Topic Traffic Generator");
                    while (!this.Shutdown_ind) {
                        this.prod.send(this.sess1.createTextMessage("TRAFFIC"));
                        this.send_count++;
                        this.cons.receive(250L);
                    }
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("Shutdown of Topic Traffic Generator; send count = " + this.send_count);
                    if (this.conn1 != null) {
                        try {
                            this.conn1.stop();
                        } catch (JMSException e) {
                            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e);
                        }
                    }
                    if (this.conn2 != null) {
                        try {
                            this.conn2.stop();
                        } catch (JMSException e2) {
                            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e2);
                        }
                    }
                } catch (Throwable th) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("Shutdown of Topic Traffic Generator; send count = " + this.send_count);
                    if (this.conn1 != null) {
                        try {
                            this.conn1.stop();
                        } catch (JMSException e3) {
                            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e3);
                        }
                    }
                    if (this.conn2 != null) {
                        try {
                            this.conn2.stop();
                        } catch (JMSException e4) {
                            RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e4);
                        }
                    }
                    throw th;
                }
            } catch (JMSException e5) {
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("traffic generator failed on jms exception", e5);
                RequestReplyToTopicViaThreeNetworkHopsTest.LOG.info("Shutdown of Topic Traffic Generator; send count = " + this.send_count);
                if (this.conn1 != null) {
                    try {
                        this.conn1.stop();
                    } catch (JMSException e6) {
                        RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e6);
                    }
                }
                if (this.conn2 != null) {
                    try {
                        this.conn2.stop();
                    } catch (JMSException e7) {
                        RequestReplyToTopicViaThreeNetworkHopsTest.LOG.warn("failed to shutdown connection", e7);
                    }
                }
            }
        }
    }

    public RequestReplyToTopicViaThreeNetworkHopsTest() throws Exception {
        this.edge1.coreConnectTo(this.core1, this.duplex);
        this.edge2.coreConnectTo(this.core2, this.duplex);
        this.core1.coreConnectTo(this.core2, this.duplex);
    }

    public void logMessage(String str) {
        System.out.println(str);
        System.out.flush();
    }

    public void testMessages(Session session, MessageProducer messageProducer, Destination destination, int i) throws Exception {
        MessageConsumer createConsumer = session.createConsumer(destination);
        MessageClient messageClient = new MessageClient(createConsumer, i);
        messageClient.start();
        for (int i2 = 0; i2 < i && !this.fatalTestError; i2++) {
            TextMessage createTextMessage = session.createTextMessage("MSG AAAA " + i2);
            createTextMessage.setIntProperty("SEQ", 100 + i2);
            createTextMessage.setStringProperty("TEST", "TOPO");
            createTextMessage.setJMSReplyTo(destination);
            if (i2 == i - 1) {
                createTextMessage.setBooleanProperty("end-of-response", true);
            }
            sendWithRetryOnDeletedDest(messageProducer, createTextMessage);
            LOG.debug("Sent:" + createTextMessage);
        }
        messageClient.waitShutdown(5000L);
        if (messageClient.shutdown()) {
            LOG.debug("Consumer client shutdown complete");
        } else {
            LOG.debug("Consumer client shutdown incomplete!!!");
        }
        int i3 = i * (this.echoResponseFill + 1);
        if (messageClient.getNumMsgReceived() == i3) {
            LOG.debug("Have " + i3 + " messages, as-expected");
        } else {
            this.testError = true;
            if (messageClient.getNumMsgReceived() == 0) {
                this.fatalTestError = true;
            }
            LOG.error("Have " + messageClient.getNumMsgReceived() + " messages; expected " + i3 + " on destination " + destination);
        }
        createConsumer.close();
    }

    protected void sendWithRetryOnDeletedDest(MessageProducer messageProducer, Message message) throws JMSException {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("SENDING REQUEST message " + message);
            }
            messageProducer.send(message);
        } catch (JMSException e) {
            System.out.println("AAA: " + e.getMessage());
            throw e;
        }
    }

    public void testOneDest(Connection connection, Session session, Destination destination, int i) throws Exception {
        LOG.trace("Creating echo queue and producer");
        MessageProducer createProducer = session.createProducer(session.createQueue("echo"));
        testMessages(session, createProducer, destination, i);
        createProducer.close();
    }

    public void testTempTopic(String str, String str2) throws Exception {
        LOG.debug("TESTING TEMP TOPICS " + str + " -> " + str2 + " (" + 5 + " messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Creating destination");
        testOneDest(createConnection, createSession, createSession.createTemporaryTopic(), 5);
        createSession.close();
        createConnection.close();
    }

    public void testTopic(String str, String str2) throws Exception {
        LOG.info("TESTING TOPICS " + str + " -> " + str2 + " (" + 5 + " messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Removing existing Topic");
        removeTopic(createConnection, "topotest2.perm.topic");
        LOG.trace("Creating Topic, " + "topotest2.perm.topic");
        testOneDest(createConnection, createSession, createSession.createTopic("topotest2.perm.topic"), 5);
        removeTopic(createConnection, "topotest2.perm.topic");
        createSession.close();
        createConnection.close();
    }

    public void testTempQueue(String str, String str2) throws Exception {
        LOG.info("TESTING TEMP QUEUES " + str + " -> " + str2 + " (" + 5 + " messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Creating destination");
        testOneDest(createConnection, createSession, createSession.createTemporaryQueue(), 5);
        createSession.close();
        createConnection.close();
    }

    public void testQueue(String str, String str2) throws Exception {
        LOG.info("TESTING QUEUES " + str + " -> " + str2 + " (" + 5 + " messages)");
        Connection createConnection = createConnection(str2);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        LOG.trace("Removing existing Queue");
        removeQueue(createConnection, "topotest2.perm.queue");
        LOG.trace("Creating Queue, " + "topotest2.perm.queue");
        testOneDest(createConnection, createSession, createSession.createQueue("topotest2.perm.queue"), 5);
        removeQueue(createConnection, "topotest2.perm.queue");
        createSession.close();
        createConnection.close();
    }

    @Test
    public void runWithTempTopicReplyTo() throws Exception {
        this.fatalTestError = false;
        this.testError = false;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(10000));
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.RequestReplyToTopicViaThreeNetworkHopsTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.edge1.start();
                } catch (Exception e) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error(null, e);
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.RequestReplyToTopicViaThreeNetworkHopsTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.edge2.start();
                } catch (Exception e) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error(null, e);
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.activemq.usecases.RequestReplyToTopicViaThreeNetworkHopsTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.core1.start();
                } catch (Exception e) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error(null, e);
                }
            }
        };
        Thread thread4 = new Thread() { // from class: org.apache.activemq.usecases.RequestReplyToTopicViaThreeNetworkHopsTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.core2.start();
                } catch (Exception e) {
                    RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error(null, e);
                }
            }
        };
        thread.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread.join();
        thread2.join();
        thread3.join();
        thread4.join();
        TopicTrafficGenerator topicTrafficGenerator = new TopicTrafficGenerator(this.edge1.getConnectionUrl(), this.edge2.getConnectionUrl());
        topicTrafficGenerator.start();
        EchoService echoService = new EchoService(this, "echo", this.edge1.getConnectionUrl());
        echoService.start();
        LOG.info("** STARTING TEMP TOPIC TESTS");
        int i = 0;
        while (i < 10 && !this.fatalTestError) {
            threadPoolExecutor.execute(new Runnable() { // from class: org.apache.activemq.usecases.RequestReplyToTopicViaThreeNetworkHopsTest.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.testTempTopic(RequestReplyToTopicViaThreeNetworkHopsTest.this.edge1.getConnectionUrl(), RequestReplyToTopicViaThreeNetworkHopsTest.this.edge2.getConnectionUrl());
                    } catch (Exception e) {
                        RequestReplyToTopicViaThreeNetworkHopsTest.LOG.error("test exception", e);
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.fatalTestError = true;
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                    }
                    countDownLatch.countDown();
                }
            });
            i++;
        }
        boolean await = countDownLatch.await(20L, TimeUnit.MINUTES);
        LOG.info("** FINISHED TEMP TOPIC TESTS AFTER " + i + " ITERATIONS, testError:" + this.testError + ", fatal: " + this.fatalTestError + ", onTime:" + await);
        Thread.sleep(100L);
        echoService.shutdown();
        topicTrafficGenerator.shutdown();
        shutdown();
        Assert.assertTrue("test completed in time", await);
        Assert.assertTrue("no errors", !this.testError);
    }

    public void shutdown() throws Exception {
        this.edge1.stop();
        this.edge2.stop();
        this.core1.stop();
        this.core2.stop();
    }

    protected Connection createConnection(String str) throws Exception {
        return ActiveMQConnection.makeConnection(str);
    }

    protected static void removeQueue(Connection connection, String str) throws Exception {
        if (connection instanceof ActiveMQConnection) {
            ((ActiveMQConnection) connection).destroyDestination(ActiveMQDestination.createDestination(str, (byte) 1));
        }
    }

    protected static void removeTopic(Connection connection, String str) throws Exception {
        if (connection instanceof ActiveMQConnection) {
            ((ActiveMQConnection) connection).destroyDestination(ActiveMQDestination.createDestination(str, (byte) 2));
        }
    }

    public static String fmtMsgInfo(Message message) throws Exception {
        new StringBuilder();
        StringBuilder sb = new StringBuilder();
        if (message instanceof TextMessage) {
            sb.append(((TextMessage) message).getText());
        } else {
            sb.append("[");
            sb.append(message.getClass().getName());
            sb.append("]");
        }
        Enumeration propertyNames = message.getPropertyNames();
        while (propertyNames.hasMoreElements()) {
            String str = (String) propertyNames.nextElement();
            sb.append("; ");
            sb.append(str);
            sb.append("=");
            sb.append(message.getStringProperty(str));
        }
        return sb.toString();
    }
}
