package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ1917Test.class */
public class AMQ1917Test extends TestCase {
    private static final int NUM_MESSAGES = 4000;
    private static final int NUM_THREADS = 10;
    private static final String REQUEST_QUEUE = "mock.in.queue";
    private static final String REPLY_QUEUE = "mock.out.queue";
    private ThreadPoolExecutor tpe;
    private String connectionUri;
    private Destination requestDestination = ActiveMQDestination.createDestination(REQUEST_QUEUE, (byte) 1);
    private Destination replyDestination = ActiveMQDestination.createDestination(REPLY_QUEUE, (byte) 1);
    private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
    private CountDownLatch errorLatch = new CountDownLatch(1);
    private final String BROKER_URL = JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    private BrokerService broker = null;
    private boolean working = true;
    final Session[] sessions = new Session[10];
    final MessageProducer[] producers = new MessageProducer[10];

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1917Test$LimitedThreadFactory.class */
    public class LimitedThreadFactory implements ThreadFactory {
        int threadCount;
        private ThreadFactory factory;

        public LimitedThreadFactory(ThreadFactory threadFactory) {
            this.factory = threadFactory;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            int i = this.threadCount + 1;
            this.threadCount = i;
            if (i > 10) {
                AMQ1917Test.this.errorLatch.countDown();
                TestCase.fail("too many threads requested");
            }
            return this.factory.newThread(runnable);
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1917Test$MessageSenderReceiver.class */
    class MessageSenderReceiver implements Runnable {
        Destination reqDest;
        Destination replyDest;
        String origMsg;

        public MessageSenderReceiver(Destination destination, Destination destination2, String str) throws Exception {
            this.replyDest = destination2;
            this.reqDest = destination;
            this.origMsg = str;
        }

        private int getIndexFromCurrentThread() {
            String name = Thread.currentThread().getName();
            int parseInt = Integer.parseInt(name.substring(name.lastIndexOf(45) + 1)) - 1;
            TestCase.assertTrue("idx is in range: idx=" + parseInt, parseInt < 10);
            return parseInt;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                int indexFromCurrentThread = getIndexFromCurrentThread();
                Session session = AMQ1917Test.this.sessions[indexFromCurrentThread];
                MessageProducer messageProducer = AMQ1917Test.this.producers[indexFromCurrentThread];
                TextMessage createTextMessage = session.createTextMessage(this.origMsg);
                messageProducer.setDeliveryMode(1);
                messageProducer.send(createTextMessage);
                String str = "JMSCorrelationID='" + createTextMessage.getJMSMessageID() + "'";
                MessageConsumer createConsumer = session.createConsumer(this.replyDest, str);
                Message receive = createConsumer.receive(2000L);
                createConsumer.close();
                if (receive == null) {
                    AMQ1917Test.this.errorLatch.countDown();
                    TestCase.fail("Unable to receive response for:" + this.origMsg + ", with selector=" + str);
                } else {
                    AMQ1917Test.this.roundTripLatch.countDown();
                }
            } catch (JMSException e) {
                TestCase.fail("unexpected exception:" + e);
            }
        }
    }

    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.start();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.tpe = new ThreadPoolExecutor(10, 10, 60000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10000));
        this.tpe.setThreadFactory(new LimitedThreadFactory(this.tpe.getThreadFactory()));
    }

    public void tearDown() throws Exception {
        this.broker.stop();
        this.tpe.shutdown();
    }

    public void testLoadedSendRecieveWithCorrelationId() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(this.connectionUri);
        setupReceiver(activeMQConnectionFactory.createConnection());
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        for (int i = 0; i < 10; i++) {
            this.sessions[i] = createConnection.createSession(false, 1);
            this.producers[i] = this.sessions[i].createProducer(this.requestDestination);
        }
        for (int i2 = 0; i2 < NUM_MESSAGES; i2++) {
            this.tpe.execute(new MessageSenderReceiver(this.requestDestination, this.replyDestination, "Test Message : " + i2));
        }
        while (true) {
            if (!this.roundTripLatch.await(4000L, TimeUnit.MILLISECONDS)) {
                if (this.errorLatch.await(1000L, TimeUnit.MILLISECONDS)) {
                    fail("there was an error, check the console for thread or thread allocation failure");
                    break;
                }
            } else {
                break;
            }
        }
        this.working = false;
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.activemq.bugs.AMQ1917Test$1] */
    private void setupReceiver(Connection connection) throws Exception {
        final Session createSession = connection.createSession(false, 1);
        final MessageConsumer createConsumer = createSession.createConsumer(this.requestDestination);
        final MessageProducer createProducer = createSession.createProducer(this.replyDestination);
        connection.start();
        new Thread() { // from class: org.apache.activemq.bugs.AMQ1917Test.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (AMQ1917Test.this.working) {
                    try {
                        TextMessage receive = createConsumer.receive(20000L);
                        if (receive == null) {
                            AMQ1917Test.this.errorLatch.countDown();
                            TestCase.fail("Response timed out. latchCount=" + AMQ1917Test.this.roundTripLatch.getCount());
                        } else {
                            String text = receive.getText();
                            TextMessage createTextMessage = createSession.createTextMessage();
                            createTextMessage.setJMSCorrelationID(receive.getJMSMessageID());
                            createTextMessage.setText(text);
                            createProducer.send(createTextMessage);
                        }
                    } catch (JMSException e) {
                        if (AMQ1917Test.this.working) {
                            AMQ1917Test.this.errorLatch.countDown();
                            TestCase.fail("Unexpected exception:" + e);
                        }
                    }
                }
            }
        }.start();
    }
}
