package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/JdbcDurableSubDupTest.class */
public class JdbcDurableSubDupTest {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcDurableSubDupTest.class);
    final int prefetchVal = 150;
    String urlOptions = "jms.watchTopicAdvisories=false";
    String url = null;
    String queueName = "topicTest?consumer.prefetchSize=150";
    String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
    String selector = "";
    String clntVersion = "87";
    String clntId = "timsClntId345" + this.clntVersion;
    String subscriptionName = "subscriptionName-y" + this.clntVersion;
    SimpleDateFormat dtf = new SimpleDateFormat("HH:mm:ss");
    final int TO_RECEIVE = 5000;
    BrokerService broker = null;
    Vector<Throwable> exceptions = new Vector<>();
    final int MAX_MESSAGES = 100000;
    int[] dupChecker = new int[100000];

    /* loaded from: input_file:org/apache/activemq/usecases/JdbcDurableSubDupTest$JmsConsumerDup.class */
    class JmsConsumerDup implements MessageListener {
        long count = 0;
        AtomicBoolean done = new AtomicBoolean(false);

        JmsConsumerDup() {
        }

        public void run() {
            Connection connection = null;
            try {
                try {
                    connection = new ActiveMQConnectionFactory(JdbcDurableSubDupTest.this.url).createConnection("MyUsername", "MyPassword");
                    connection.setClientID(JdbcDurableSubDupTest.this.clntId);
                    connection.start();
                    Session createSession = connection.createSession(false, 1);
                    createSession.createDurableSubscriber(createSession.createTopic(JdbcDurableSubDupTest.this.queueName), JdbcDurableSubDupTest.this.subscriptionName, JdbcDurableSubDupTest.this.selector, false).setMessageListener(this);
                    JdbcDurableSubDupTest.LOG.info("Waiting for messages...");
                    while (!this.done.get()) {
                        TimeUnit.SECONDS.sleep(5L);
                        if (this.count == 5000 || !JdbcDurableSubDupTest.this.exceptions.isEmpty()) {
                            this.done.set(true);
                        }
                    }
                    if (connection != null) {
                        try {
                            JdbcDurableSubDupTest.LOG.info("consumer done (" + JdbcDurableSubDupTest.this.exceptions.isEmpty() + "), closing connection");
                            connection.close();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                } catch (Throwable th) {
                    if (connection != null) {
                        try {
                            JdbcDurableSubDupTest.LOG.info("consumer done (" + JdbcDurableSubDupTest.this.exceptions.isEmpty() + "), closing connection");
                            connection.close();
                        } catch (JMSException e2) {
                            e2.printStackTrace();
                        }
                    }
                    throw th;
                }
            } catch (Exception e3) {
                JdbcDurableSubDupTest.LOG.error("caught", e3);
                JdbcDurableSubDupTest.this.exceptions.add(e3);
                throw new RuntimeException(e3);
            }
        }

        public void onMessage(Message message) {
            this.count++;
            try {
                Thread.sleep(0L);
            } catch (InterruptedException e) {
            }
            try {
                TextMessage textMessage = (TextMessage) message;
                if (this.count % 100 == 0) {
                    Logger logger = JdbcDurableSubDupTest.LOG;
                    long j = this.count;
                    String text = textMessage.getText();
                    String format = JdbcDurableSubDupTest.this.dtf.format(new Date(textMessage.getJMSTimestamp()));
                    String format2 = JdbcDurableSubDupTest.this.dtf.format(new Date());
                    String format3 = JdbcDurableSubDupTest.this.dtf.format(new Date(textMessage.getJMSExpiration()));
                    textMessage.getJMSMessageID();
                    logger.info("Rcvd Msg #-" + j + " " + logger + " Sent->" + text + " Recv->" + format + " Expr->" + format2 + ", mid: " + format3);
                }
                int intProperty = textMessage.getIntProperty("SeqNo");
                if (intProperty < 100000) {
                    if (JdbcDurableSubDupTest.this.dupChecker[intProperty] == 1) {
                        Logger logger2 = JdbcDurableSubDupTest.LOG;
                        long j2 = this.count;
                        textMessage.getJMSMessageID();
                        logger2.error("Duplicate message received at count: " + j2 + ", id: " + logger2);
                        JdbcDurableSubDupTest.this.exceptions.add(new RuntimeException("Got Duplicate at: " + textMessage.getJMSMessageID()));
                    } else {
                        JdbcDurableSubDupTest.this.dupChecker[intProperty] = 1;
                    }
                }
            } catch (JMSException e2) {
                JdbcDurableSubDupTest.LOG.error("caught ", e2);
                JdbcDurableSubDupTest.this.exceptions.add(e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/JdbcDurableSubDupTest$JmsProvider.class */
    class JmsProvider implements Runnable {
        int priorityModulator = 10;

        JmsProvider() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Session createSession = new ActiveMQConnectionFactory(JdbcDurableSubDupTest.this.url).createConnection("MyUserName", "MyPassword").createSession(false, 1);
                MessageProducer createProducer = createSession.createProducer(createSession.createTopic(JdbcDurableSubDupTest.this.queueName));
                createProducer.setPriority(3);
                createProducer.setTimeToLive(0L);
                createProducer.setDeliveryMode(2);
                int i = 0;
                int i2 = 5000 / 1000;
                for (int i3 = 0; i3 < i2; i3++) {
                    TextMessage createTextMessage = createSession.createTextMessage();
                    for (int i4 = 0; i4 < 1000; i4++) {
                        int i5 = this.priorityModulator <= 10 ? i % this.priorityModulator : i >= this.priorityModulator ? 9 : 0;
                        createTextMessage.setText(JdbcDurableSubDupTest.this.xmlMessage + i + "-" + i5);
                        createTextMessage.setJMSPriority(i5);
                        createTextMessage.setIntProperty("SeqNo", i);
                        if (i4 > 0 && i4 % 100 == 0) {
                            JdbcDurableSubDupTest.LOG.info("Sending message: " + createTextMessage.getText());
                        }
                        createProducer.send(createTextMessage, 2, createTextMessage.getJMSPriority(), 0L);
                        i++;
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        JdbcDurableSubDupTest.this.exceptions.add(e);
                    }
                }
            } catch (JMSException e2) {
                JdbcDurableSubDupTest.LOG.error("caught ", e2);
                e2.printStackTrace();
                JdbcDurableSubDupTest.this.exceptions.add(e2);
            }
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.exceptions.clear();
        for (int i = 0; i < 100000; i++) {
            this.dupChecker[i] = 0;
        }
        this.broker = new BrokerService();
        this.broker.setAdvisorySupport(false);
        this.broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMaxAuditDepth(3000);
        policyEntry.setMaxPageSize(150);
        policyEntry.setPrioritizedMessages(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString() + "?" + this.urlOptions;
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testNoDupsOnSlowConsumerReconnect() throws Exception {
        JmsConsumerDup jmsConsumerDup = new JmsConsumerDup();
        jmsConsumerDup.done.set(true);
        jmsConsumerDup.run();
        jmsConsumerDup.done.set(false);
        LOG.info("serial production then consumption");
        new JmsProvider().run();
        jmsConsumerDup.run();
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
        for (int i = 0; i < 5000; i++) {
            Assert.assertTrue("got message " + i, this.dupChecker[i] == 1);
        }
    }

    @Test
    public void testNoDupsOnSlowConsumerLargePriorityGapReconnect() throws Exception {
        JmsConsumerDup jmsConsumerDup = new JmsConsumerDup();
        jmsConsumerDup.done.set(true);
        jmsConsumerDup.run();
        jmsConsumerDup.done.set(false);
        JmsProvider jmsProvider = new JmsProvider();
        jmsProvider.priorityModulator = 2500;
        jmsProvider.run();
        jmsConsumerDup.run();
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
        for (int i = 0; i < 5000; i++) {
            Assert.assertTrue("got message " + i, this.dupChecker[i] == 1);
        }
    }
}
