package org.apache.activemq.bugs;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.nio.NIOSSLLoadTest;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2021Test.class */
public class AMQ2021Test extends TestCase implements ExceptionListener, Thread.UncaughtExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
    BrokerService brokerService;
    Vector<Throwable> exceptions;
    AMQ2021Test testCase;
    private String PRODUCER_BROKER_URL;
    private CountDownLatch receivedLatch;
    private ActiveMQTopic destination;
    private CountDownLatch started;
    ArrayList<Thread> threads = new ArrayList<>();
    private final String ACTIVEMQ_BROKER_BIND = JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT;
    private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
    private final int numMessages = NIOSSLLoadTest.MESSAGE_COUNT;
    private final int numConsumers = 2;
    private final int dlqMessages = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2021Test$ConsumerThread.class */
    public class ConsumerThread extends Thread implements MessageListener {
        public long counter;
        public long recoveries;
        private Session session;

        public ConsumerThread(String str) {
            super(str);
            this.counter = 0L;
            this.recoveries = 0L;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Connection createConnection = new ActiveMQConnectionFactory(AMQ2021Test.this.CONSUMER_BROKER_URL).createConnection();
                createConnection.setExceptionListener(AMQ2021Test.this.testCase);
                createConnection.setClientID(getName());
                this.session = createConnection.createSession(false, 2);
                this.session.createDurableSubscriber(AMQ2021Test.this.destination, getName()).setMessageListener(this);
                createConnection.start();
                AMQ2021Test.this.started.countDown();
            } catch (JMSException e) {
                AMQ2021Test.log.error("unexpected ex in consumer run", e);
                AMQ2021Test.this.exceptions.add(e);
            }
        }

        public void onMessage(Message message) {
            try {
                this.counter++;
                if (message.getIntProperty("MsgNumber") % 2 == 0) {
                    this.session.recover();
                    this.recoveries++;
                } else {
                    message.acknowledge();
                }
                if (this.counter % 200 == 0) {
                    AMQ2021Test.log.info("recoveries:" + this.recoveries + ", Received " + this.counter + ", counter'th " + message);
                }
                AMQ2021Test.this.receivedLatch.countDown();
            } catch (Exception e) {
                AMQ2021Test.log.error("unexpected ex on onMessage", e);
                AMQ2021Test.this.exceptions.add(e);
            }
        }
    }

    protected void setUp() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        this.testCase = this;
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        this.destination = new ActiveMQTopic(getName());
        this.exceptions = new Vector<>();
        this.CONSUMER_BROKER_URL = ((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString() + this.CONSUMER_BROKER_URL;
        this.PRODUCER_BROKER_URL = ((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        this.receivedLatch = new CountDownLatch(3000);
        this.started = new CountDownLatch(1);
    }

    protected void tearDown() throws Exception {
        Iterator<Thread> it = this.threads.iterator();
        while (it.hasNext()) {
            Thread next = it.next();
            next.interrupt();
            next.join();
        }
        this.brokerService.stop();
    }

    public void testConcurrentTopicResendToDLQ() throws Exception {
        for (int i = 0; i < 2; i++) {
            ConsumerThread consumerThread = new ConsumerThread("Consumer-" + i);
            this.threads.add(consumerThread);
            consumerThread.start();
        }
        assertTrue(this.started.await(10L, TimeUnit.SECONDS));
        Thread thread = new Thread() { // from class: org.apache.activemq.bugs.AMQ2021Test.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AMQ2021Test.this.produce(NIOSSLLoadTest.MESSAGE_COUNT);
                } catch (Exception e) {
                }
            }
        };
        this.threads.add(thread);
        thread.start();
        boolean await = this.receivedLatch.await(90L, TimeUnit.SECONDS);
        Iterator<Throwable> it = this.exceptions.iterator();
        while (it.hasNext()) {
            Throwable next = it.next();
            log.error("failing test with first exception", next);
            fail("exception during test : " + next);
        }
        assertTrue("excepted messages received within time limit", await);
        assertEquals(0, this.exceptions.size());
        for (int i2 = 0; i2 < 2; i2++) {
            assertEquals(1000L, ((ConsumerThread) this.threads.get(i2)).recoveries);
            assertEquals(1500L, ((ConsumerThread) this.threads.get(i2)).counter);
        }
        consumeFromDLQ(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
    }

    private void consumeFromDLQ(int i) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.CONSUMER_BROKER_URL).createConnection();
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        int i2 = 0;
        for (int i3 = 0; i3 < i && createConsumer.receive(1000L) != null; i3++) {
            i2++;
        }
        assertEquals(i, i2);
    }

    public void produce(int i) throws Exception {
        Connection connection = null;
        try {
            try {
                connection = new ActiveMQConnectionFactory(this.PRODUCER_BROKER_URL).createConnection();
                Session createSession = connection.createSession(false, 1);
                MessageProducer createProducer = createSession.createProducer(this.destination);
                createProducer.setTimeToLive(0L);
                connection.start();
                for (int i2 = 0; i2 < i; i2++) {
                    int i3 = i2 + 1;
                    TextMessage createTextMessage = createSession.createTextMessage(getName() + " Message " + i3);
                    createTextMessage.setIntProperty("MsgNumber", i3);
                    createProducer.send(createTextMessage);
                    if (i3 % DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP == 0) {
                        log.info("sent " + i3 + ", ith " + createTextMessage);
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th) {
                    }
                }
            } catch (JMSException e) {
                log.error("unexpected ex on produce", e);
                this.exceptions.add(e);
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                    }
                }
            }
        } catch (Throwable th3) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th4) {
                    throw th3;
                }
            }
            throw th3;
        }
    }

    public void onException(JMSException jMSException) {
        log.info("Unexpected JMSException", jMSException);
        this.exceptions.add(jMSException);
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        log.info("Unexpected exception from thread " + thread + ", ex: " + th);
        this.exceptions.add(th);
    }
}
