package org.apache.activemq.bugs;

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ3732Test.class */
public class AMQ3732Test {
    private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class);
    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private BrokerService broker;
    private String connectionUri;
    private final Random pause = new Random();
    private final long NUM_MESSAGES = 25000;
    private final AtomicLong totalConsumed = new AtomicLong();

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connectionFactory.getPrefetchPolicy().setAll(0);
    }

    @After
    public void stopBroker() throws Exception {
        try {
            this.connection.close();
        } catch (Exception e) {
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test(timeout = 1200000)
    public void testInterruptionAffects() throws Exception {
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 4);
        Queue createQueue = this.session.createQueue("AMQ3732Test");
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final MessageConsumer createConsumer = this.session.createConsumer(createQueue);
        final MessageConsumer createConsumer2 = this.session.createConsumer(createQueue);
        final MessageProducer createProducer = this.session.createProducer(createQueue);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.activemq.bugs.AMQ3732Test.1
            @Override // java.lang.Runnable
            public void run() {
                while (AMQ3732Test.this.totalConsumed.get() < 25000) {
                    try {
                        Message receiveNoWait = createConsumer.receiveNoWait();
                        if (receiveNoWait != null) {
                            linkedBlockingQueue.add(receiveNoWait);
                        }
                    } catch (Exception e) {
                        AMQ3732Test.LOG.error("Caught an unexpected error: ", e);
                        return;
                    }
                }
            }
        });
        thread.start();
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.activemq.bugs.AMQ3732Test.2
            @Override // java.lang.Runnable
            public void run() {
                while (AMQ3732Test.this.totalConsumed.get() < 25000) {
                    try {
                        Message receive = createConsumer2.receive(50L);
                        if (receive != null) {
                            linkedBlockingQueue.add(receive);
                        }
                    } catch (Exception e) {
                        AMQ3732Test.LOG.error("Caught an unexpected error: ", e);
                        return;
                    }
                }
            }
        });
        thread2.start();
        Thread thread3 = new Thread(new Runnable() { // from class: org.apache.activemq.bugs.AMQ3732Test.3
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 25000; i++) {
                    try {
                        createProducer.send(AMQ3732Test.this.session.createTextMessage("TEST"));
                        TimeUnit.MILLISECONDS.sleep(AMQ3732Test.this.pause.nextInt(10));
                    } catch (Exception e) {
                        AMQ3732Test.LOG.error("Caught an unexpected error: ", e);
                        return;
                    }
                }
            }
        });
        thread3.start();
        Thread thread4 = new Thread(new Runnable() { // from class: org.apache.activemq.bugs.AMQ3732Test.4
            @Override // java.lang.Runnable
            public void run() {
                while (AMQ3732Test.this.totalConsumed.get() < 25000) {
                    try {
                        ((Message) linkedBlockingQueue.take()).acknowledge();
                        AMQ3732Test.this.totalConsumed.incrementAndGet();
                        if (AMQ3732Test.this.totalConsumed.get() % 100 == 0) {
                            AMQ3732Test.LOG.info("Consumed " + AMQ3732Test.this.totalConsumed.get() + " messages so far.");
                        }
                    } catch (Exception e) {
                        AMQ3732Test.LOG.error("Caught an unexpected error: ", e);
                        return;
                    }
                }
            }
        });
        thread4.start();
        thread3.join();
        thread.join();
        thread2.join();
        thread4.join();
        Assert.assertEquals(25000L, this.totalConsumed.get());
    }
}
