package org.apache.activemq.test.retroactive;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/test/retroactive/RetroactiveConsumerBrokerRestartedTest.class */
public class RetroactiveConsumerBrokerRestartedTest extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(RetroactiveConsumerBrokerRestartedTest.class);
    private static final String ACTIVEMQ_BROKER_URI = "tcp://localhost:62626";
    private BrokerService broker;
    Connection connection;

    protected void setUp() throws Exception {
        createBroker();
        this.connection = getConnection();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
    }

    protected void restartBroker() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        createRestartedBroker();
    }

    private void createBroker() throws Exception {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopic(">");
        policyEntry.setDispatchPolicy(new SimpleDispatchPolicy());
        FixedCountSubscriptionRecoveryPolicy fixedCountSubscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
        fixedCountSubscriptionRecoveryPolicy.setMaximumSize(10);
        policyEntry.setSubscriptionRecoveryPolicy(fixedCountSubscriptionRecoveryPolicy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker = new BrokerService();
        this.broker.setBrokerName("durable-broker");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistenceAdapter(createPersistenceAdapter());
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(ACTIVEMQ_BROKER_URI);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private void createRestartedBroker() throws Exception {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopic(">");
        policyEntry.setDispatchPolicy(new SimpleDispatchPolicy());
        FixedCountSubscriptionRecoveryPolicy fixedCountSubscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
        fixedCountSubscriptionRecoveryPolicy.setMaximumSize(10);
        policyEntry.setSubscriptionRecoveryPolicy(fixedCountSubscriptionRecoveryPolicy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker = new BrokerService();
        this.broker.setBrokerName("durable-broker");
        this.broker.setDeleteAllMessagesOnStartup(false);
        this.broker.setPersistenceAdapter(createPersistenceAdapter());
        this.broker.setPersistent(true);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector(ACTIVEMQ_BROKER_URI);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected PersistenceAdapter createPersistenceAdapter() throws Exception {
        return new KahaDBPersistenceAdapter();
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
    }

    private Connection getConnection() throws Exception {
        Connection createConnection = createConnectionFactory().createConnection();
        createConnection.setClientID("cliId1");
        return createConnection;
    }

    public void testFixedCountSubscriptionRecoveryPolicy() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("TestTopic?consumer.retroactive=true");
        Topic createTopic2 = createSession.createTopic("TestTopic");
        createSession.createDurableSubscriber(createTopic, "sub1");
        MessageProducer createProducer = createSession.createProducer(createTopic2);
        createProducer.setDeliveryMode(2);
        createProducer.send(createSession.createTextMessage("Msg:1"));
        createProducer.send(createSession.createTextMessage("Msg:2"));
        createProducer.send(createSession.createTextMessage("Msg:3"));
        restartBroker();
        this.connection = getConnection();
        this.connection.start();
        Session createSession2 = this.connection.createSession(false, 1);
        MessageProducer createProducer2 = createSession2.createProducer(createTopic2);
        createProducer2.setDeliveryMode(2);
        createProducer2.send(createSession2.createTextMessage("Msg:4"));
        TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(createTopic, "sub1");
        assertTextMessageEquals("Msg:1", createDurableSubscriber.receive(1000L));
        assertTextMessageEquals("Msg:2", createDurableSubscriber.receive(1000L));
        assertTextMessageEquals("Msg:3", createDurableSubscriber.receive(1000L));
        assertTextMessageEquals("Msg:4", createDurableSubscriber.receive(1000L));
    }

    private void assertTextMessageEquals(String str, Message message) throws JMSException {
        assertNotNull("Message was null", message);
        assertTrue("Message is not a TextMessage", message instanceof TextMessage);
        assertEquals(str, ((TextMessage) message).getText());
    }
}
