package org.apache.activemq.broker.region.cursors;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.class */
public class KahaDBPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
    protected static final Logger LOG = LoggerFactory.getLogger(KahaDBPendingMessageCursorTest.class);

    @Rule
    public TemporaryFolder dataFileDir;

    @Parameterized.Parameters(name = "prioritizedMessages={0},enableSubscriptionStatistics={1}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false});
    }

    public KahaDBPendingMessageCursorTest(boolean z, boolean z2) {
        super(z);
        this.dataFileDir = new TemporaryFolder(new File("target"));
        this.enableSubscriptionStatistics = z2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    public void setUpBroker(boolean z) throws Exception {
        if (z && this.dataFileDir.getRoot().exists()) {
            FileUtils.cleanDirectory(this.dataFileDir.getRoot());
        }
        super.setUpBroker(z);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    protected void initPersistence(BrokerService brokerService) throws IOException {
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(this.dataFileDir.getRoot());
        kahaDBPersistenceAdapter.setEnableSubscriptionStatistics(this.enableSubscriptionStatistics);
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
    }

    @Test
    public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        long pendingMessageSize = ((DurableTopicSubscription) publishTestMessagesDurable.getDurableTopicSubs().get(subscriptionKey)).getPendingMessageSize();
        Assert.assertEquals(pendingMessageSize, publishTestMessagesDurable.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        stopBroker();
        setUpBroker(false);
        Assert.assertEquals(pendingMessageSize, ((DurableTopicSubscription) getBroker().getDestination(new ActiveMQTopic(this.defaultTopicName)).getDurableTopicSubs().get(subscriptionKey)).getPendingMessageSize());
        Connection createConnection2 = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection2.setClientID("clientId");
        createConnection2.start();
        Topic publishTestMessagesDurable2 = publishTestMessagesDurable(createConnection2, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable2, subscriptionKey, DurableSubProcessWithRestartTest.CARGO_SIZE, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable2, DurableSubProcessWithRestartTest.CARGO_SIZE, atomicLong.get());
    }

    @Test
    public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub2");
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 50, atomicLong);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 150, atomicLong.get());
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey2, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        createConnection.close();
    }

    @Test
    public void testNonPersistentDurableMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 1);
        verifyPendingStats(publishTestMessagesDurable, new SubscriptionKey("clientId", "sub1"), 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 0, 0L);
    }

    @Test
    public void testEnabledSubscriptionStatistics() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub2");
        TopicMessageStore messageStore = publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, atomicLong, 2).getMessageStore();
        MessageStoreSubscriptionStatistics messageStoreSubStatistics = messageStore.getMessageStoreSubStatistics();
        if (!this.enableSubscriptionStatistics) {
            Assert.assertTrue(messageStoreSubStatistics.getMessageCount(subscriptionKey.toString()).getCount() == 0);
            Assert.assertTrue(messageStoreSubStatistics.getMessageSize(subscriptionKey.toString()).getTotalSize() == 0);
            Assert.assertTrue(messageStoreSubStatistics.getMessageCount(subscriptionKey2.toString()).getCount() == 0);
            Assert.assertTrue(messageStoreSubStatistics.getMessageSize(subscriptionKey2.toString()).getTotalSize() == 0);
            Assert.assertEquals(0L, messageStoreSubStatistics.getMessageCount().getCount());
            Assert.assertEquals(0L, messageStoreSubStatistics.getMessageSize().getTotalSize());
            return;
        }
        Assert.assertTrue(messageStoreSubStatistics.getMessageCount(subscriptionKey.toString()).getCount() == 200);
        Assert.assertTrue(messageStoreSubStatistics.getMessageSize(subscriptionKey.toString()).getTotalSize() > 0);
        Assert.assertTrue(messageStoreSubStatistics.getMessageCount(subscriptionKey2.toString()).getCount() == 200);
        Assert.assertTrue(messageStoreSubStatistics.getMessageSize(subscriptionKey2.toString()).getTotalSize() > 0);
        Assert.assertEquals(messageStoreSubStatistics.getMessageCount().getCount(), messageStoreSubStatistics.getMessageCount(subscriptionKey.toString()).getCount() + messageStoreSubStatistics.getMessageSize(subscriptionKey.toString()).getCount());
        Assert.assertEquals(messageStoreSubStatistics.getMessageSize().getTotalSize(), messageStoreSubStatistics.getMessageSize(subscriptionKey.toString()).getTotalSize() + messageStoreSubStatistics.getMessageSize(subscriptionKey2.toString()).getTotalSize());
        messageStore.deleteSubscription(subscriptionKey2.getClientId(), subscriptionKey2.getSubscriptionName());
        Assert.assertEquals(messageStoreSubStatistics.getMessageCount().getCount(), messageStoreSubStatistics.getMessageCount(subscriptionKey.toString()).getCount());
        Assert.assertEquals(messageStoreSubStatistics.getMessageSize().getTotalSize(), messageStoreSubStatistics.getMessageSize(subscriptionKey.toString()).getTotalSize());
        Assert.assertTrue(messageStoreSubStatistics.getMessageCount(subscriptionKey2.toString()).getCount() == 0);
        Assert.assertTrue(messageStoreSubStatistics.getMessageSize(subscriptionKey2.toString()).getTotalSize() == 0);
    }

    @Test
    public void testUpdateMessageSubSize() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        jakarta.jms.Topic createTopic = createSession.createTopic(this.defaultTopicName);
        createSession.createDurableSubscriber(createTopic, "sub1");
        createSession.createDurableSubscriber(createTopic, "sub2");
        MessageProducer createProducer = createSession.createProducer(createTopic);
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setText("SmallMessage");
        createProducer.send(activeMQTextMessage);
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub1");
        Topic destination = getBroker().getDestination(new ActiveMQTopic(this.defaultTopicName));
        DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) destination.getDurableTopicSubs().get(subscriptionKey);
        DurableTopicSubscription durableTopicSubscription2 = (DurableTopicSubscription) destination.getDurableTopicSubs().get(subscriptionKey2);
        long pendingMessageSize = durableTopicSubscription.getPendingMessageSize();
        ActiveMQTextMessage message = destination.getMessageStore().getMessage(activeMQTextMessage.getMessageId());
        message.setText("LargerMessageLargerMessage");
        destination.getMessageStore().updateMessage(message);
        Assert.assertTrue(durableTopicSubscription.getPendingMessageSize() > pendingMessageSize + 10);
        Assert.assertEquals(durableTopicSubscription.getPendingMessageSize(), destination.getMessageStore().getMessageSize());
        Assert.assertEquals(durableTopicSubscription.getPendingMessageSize(), durableTopicSubscription2.getPendingMessageSize());
    }

    @Test
    public void testUpdateMessageSubSizeAfterConsume() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        jakarta.jms.Topic createTopic = createSession.createTopic(this.defaultTopicName);
        createSession.createDurableSubscriber(createTopic, "sub1");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "sub2");
        MessageProducer createProducer = createSession.createProducer(createTopic);
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setText("SmallMessage");
        ActiveMQTextMessage activeMQTextMessage2 = new ActiveMQTextMessage();
        activeMQTextMessage2.setText("SmallMessage2");
        createProducer.send(activeMQTextMessage);
        createProducer.send(activeMQTextMessage2);
        createDurableSubscriber.receive();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub2");
        Topic destination = getBroker().getDestination(new ActiveMQTopic(this.defaultTopicName));
        final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) destination.getDurableTopicSubs().get(subscriptionKey);
        final DurableTopicSubscription durableTopicSubscription2 = (DurableTopicSubscription) destination.getDurableTopicSubs().get(subscriptionKey2);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.KahaDBPendingMessageCursorTest.1
            public boolean isSatisified() throws Exception {
                return durableTopicSubscription.getPendingMessageSize() > durableTopicSubscription2.getPendingMessageSize();
            }
        });
        long pendingMessageSize = durableTopicSubscription.getPendingMessageSize();
        long pendingMessageSize2 = durableTopicSubscription2.getPendingMessageSize();
        ActiveMQTextMessage message = destination.getMessageStore().getMessage(activeMQTextMessage.getMessageId());
        message.setText("LargerMessageLargerMessage");
        destination.getMessageStore().updateMessage(message);
        Assert.assertTrue(durableTopicSubscription.getPendingMessageSize() > pendingMessageSize + 10);
        Assert.assertEquals(durableTopicSubscription.getPendingMessageSize(), destination.getMessageStore().getMessageSize());
        Assert.assertTrue(durableTopicSubscription.getPendingMessageSize() > 2 * durableTopicSubscription2.getPendingMessageSize());
        Assert.assertEquals(pendingMessageSize2, durableTopicSubscription2.getPendingMessageSize());
    }
}
