package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TopicSubscriber;
import java.util.Properties;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsTopicSendReceiveTest;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.class */
public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
    static final int NMSG = 200;
    static final int MSIZE = 256000;
    private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
    private static final String COUNT_PROPERY_NAME = "count";
    protected Connection connection2;
    protected Session session2;
    protected Session consumeSession2;
    protected MessageConsumer consumer2;
    protected MessageProducer producer2;
    protected Destination consumerDestination2;
    BrokerService broker;
    private Connection connection3;
    private Session consumeSession3;
    private TopicSubscriber consumer3;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.test.JmsTopicSendReceiveTest, org.apache.activemq.test.JmsSendReceiveTestSupport
    public void setUp() throws Exception {
        this.durable = true;
        this.broker = createBroker();
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.test.JmsTopicSendReceiveTest
    public void tearDown() throws Exception {
        super.tearDown();
        this.broker.stop();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?async=false");
        Properties properties = new Properties();
        properties.put("prefetchPolicy.durableTopicPrefetch", "5");
        properties.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
        activeMQConnectionFactory.setProperties(properties);
        return activeMQConnectionFactory;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        brokerService.setDeleteAllMessagesOnStartup(true);
    }

    public void testSlowReceiver() throws Exception {
        Message receive;
        this.connection2 = createConnection();
        this.connection2.setClientID("test");
        this.connection2.start();
        this.consumeSession2 = this.connection2.createSession(false, 1);
        this.session2 = this.connection2.createSession(false, 1);
        this.consumerDestination2 = this.session2.createTopic(getConsumerSubject() + "2");
        this.consumer2 = this.consumeSession2.createDurableSubscriber(this.consumerDestination2, getName());
        this.consumer2.close();
        this.connection2.close();
        new Thread(new Runnable() { // from class: org.apache.activemq.bugs.JmsDurableTopicSlowReceiveTest.1
            @Override // java.lang.Runnable
            public void run() {
                int i = 0;
                for (int i2 = 0; i2 < 4; i2++) {
                    try {
                        JmsDurableTopicSlowReceiveTest.this.connection2 = JmsDurableTopicSlowReceiveTest.this.createConnection();
                        JmsDurableTopicSlowReceiveTest.this.connection2.start();
                        JmsDurableTopicSlowReceiveTest.this.session2 = JmsDurableTopicSlowReceiveTest.this.connection2.createSession(false, 1);
                        JmsDurableTopicSlowReceiveTest.this.producer2 = JmsDurableTopicSlowReceiveTest.this.session2.createProducer((Destination) null);
                        JmsDurableTopicSlowReceiveTest.this.producer2.setDeliveryMode(JmsDurableTopicSlowReceiveTest.this.deliveryMode);
                        Thread.sleep(1000L);
                        for (int i3 = 0; i3 < 50; i3++) {
                            BytesMessage createBytesMessage = JmsDurableTopicSlowReceiveTest.this.session2.createBytesMessage();
                            createBytesMessage.writeBytes(new byte[JmsDurableTopicSlowReceiveTest.MSIZE]);
                            createBytesMessage.setStringProperty("test", "test");
                            createBytesMessage.setIntProperty(JmsDurableTopicSlowReceiveTest.COUNT_PROPERY_NAME, i);
                            createBytesMessage.setJMSType("test");
                            JmsDurableTopicSlowReceiveTest.this.producer2.send(JmsDurableTopicSlowReceiveTest.this.consumerDestination2, createBytesMessage);
                            Thread.sleep(50L);
                            if (JmsDurableTopicSlowReceiveTest.this.verbose) {
                                JmsDurableTopicSlowReceiveTest.LOG.debug("Sent(" + i2 + "): " + i3);
                            }
                            i++;
                        }
                        JmsDurableTopicSlowReceiveTest.this.producer2.close();
                        JmsDurableTopicSlowReceiveTest.this.connection2.stop();
                        JmsDurableTopicSlowReceiveTest.this.connection2.close();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        return;
                    }
                }
            }
        }, "SENDER Thread").start();
        this.connection3 = createConnection();
        this.connection3.setClientID("test");
        this.connection3.start();
        this.consumeSession3 = this.connection3.createSession(false, 2);
        this.consumer3 = this.consumeSession3.createDurableSubscriber(this.consumerDestination2, getName());
        this.connection3.close();
        int i = 0;
        for (int i2 = 0; i2 < 4; i2++) {
            this.connection3 = createConnection();
            this.connection3.setClientID("test");
            this.connection3.start();
            this.consumeSession3 = this.connection3.createSession(false, 2);
            this.consumer3 = this.consumeSession3.createDurableSubscriber(this.consumerDestination2, getName());
            int i3 = 0;
            while (i3 < 50 && (receive = this.consumer3.receive(DurableSubProcessWithRestartTest.BROKER_RESTART)) != null) {
                if (this.verbose) {
                    LOG.debug("Received(" + i2 + "): " + i3 + " count = " + receive.getIntProperty(COUNT_PROPERY_NAME));
                }
                assertNotNull(receive);
                assertEquals(receive.getJMSType(), "test");
                assertEquals(receive.getStringProperty("test"), "test");
                assertEquals("Messages received out of order", i, receive.getIntProperty(COUNT_PROPERY_NAME));
                Thread.sleep(500L);
                receive.acknowledge();
                i++;
                i3++;
            }
            this.consumer3.close();
            assertEquals("Receiver " + i2, 50, i3);
            this.connection3.close();
        }
    }
}
