package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/JmsQueueBrowserExpirationTest.class */
public class JmsQueueBrowserExpirationTest {
    private static final int MESSAGES_TO_SEND = 50;
    private static final long TTL = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserExpirationTest.class);
    private BrokerService broker;
    private URI connectUri;
    private ActiveMQConnectionFactory factory;
    private final ActiveMQQueue queue = new ActiveMQQueue("TEST");

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        TransportConnector addConnector = this.broker.addConnector("vm://localhost");
        this.broker.deleteAllMessages();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectUri = addConnector.getConnectUri();
        this.factory = new ActiveMQConnectionFactory(this.connectUri);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout = DurableSubProcessWithRestartTest.BROKER_RESTART)
    public void testBrowsingExpiration() throws JMSException, InterruptedException {
        sendTestMessages();
        Connection createConnection = this.factory.createConnection();
        createConnection.start();
        int browse = browse(this.queue, createConnection);
        Assert.assertEquals(50L, browse);
        long nanoTime = System.nanoTime();
        while (browse != 0) {
            Thread.sleep(100L);
            browse = browse(this.queue, createConnection);
            LOG.info("[{}ms] found {}", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), Integer.valueOf(browse));
        }
        LOG.info("Finished");
        createConnection.close();
    }

    @Test(timeout = DurableSubProcessWithRestartTest.BROKER_RESTART)
    public void testDoNotReceiveExpiredMessage() throws Exception {
        Connection createConnection = this.factory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("MyTestQueue");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setTimeToLive(1000);
        createProducer.send(createQueue, createSession.createTextMessage("Test message"));
        Assert.assertEquals(1L, getMessageCount(createQueue, createSession));
        Thread.sleep(1000 + 1000);
        Assert.assertEquals(0L, getMessageCount(createQueue, createSession));
        createProducer.close();
        createSession.close();
        createConnection.close();
    }

    private int getMessageCount(Queue queue, Session session) throws Exception {
        int i = 0;
        QueueBrowser createBrowser = session.createBrowser(queue);
        Enumeration enumeration = createBrowser.getEnumeration();
        while (enumeration.hasMoreElements()) {
            i++;
            enumeration.nextElement();
        }
        createBrowser.close();
        return i;
    }

    private int browse(ActiveMQQueue activeMQQueue, Connection connection) throws JMSException {
        Session createSession = connection.createSession(false, 1);
        QueueBrowser createBrowser = createSession.createBrowser(activeMQQueue);
        Enumeration enumeration = createBrowser.getEnumeration();
        int i = 0;
        while (enumeration.hasMoreElements()) {
            i++;
            LOG.debug("B[{}]: {}", Integer.valueOf(i), ((TextMessage) enumeration.nextElement()).getText());
        }
        createBrowser.close();
        createSession.close();
        return i;
    }

    protected void sendTestMessages() throws JMSException {
        Connection createConnection = this.factory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.queue);
        createProducer.setDeliveryMode(1);
        createProducer.setTimeToLive(TTL);
        for (int i = 1; i <= 50; i++) {
            String str = "Message: " + i;
            createProducer.send(createSession.createTextMessage(str));
            LOG.info("P&C: {}", str);
        }
        createSession.close();
    }
}
