package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.memory.MemoryMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/MemoryMessageStoreQueueCursorTest.class */
public class MemoryMessageStoreQueueCursorTest {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryMessageStoreQueueCursorTest.class);
    BrokerService brokerService;
    static final String mesageIdRoot = "11111:22222:0:";
    ActiveMQQueue destination = new ActiveMQQueue("queue-" + MemoryMessageStoreQueueCursorTest.class.getSimpleName());
    final int messageBytesSize = 1024;
    final String text = new String(new byte[1024]);

    @Before
    public void setUp() throws Exception {
        this.brokerService = createBroker();
        this.brokerService.setUseJmx(false);
        this.brokerService.setPersistent(false);
        this.brokerService.start();
    }

    protected BrokerService createBroker() throws Exception {
        return new BrokerService();
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test(timeout = DurableSubProcessWithRestartTest.BROKER_RESTART)
    public void testRecoverNextMessages2() throws Exception {
        MemoryMessageStore memoryMessageStore = new MemoryMessageStore(this.destination);
        Queue queue = new Queue(this.brokerService, this.destination, memoryMessageStore, new DestinationStatistics(), (TaskRunnerFactory) null);
        memoryMessageStore.start();
        memoryMessageStore.registerIndexListener((IndexListener) null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(5120L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message = getMessage(0);
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        memoryMessageStore.addMessage((ConnectionContext) null, message);
        queueStorePrefetch.addMessageLast(message);
        message.decrementReferenceCount();
        if (queueStorePrefetch.hasNext()) {
            MessageReference next = queueStorePrefetch.next();
            LOG.info("Received message: {} with body: ({})", next.getMessageId(), next.getMessage().getText());
            queueStorePrefetch.remove();
            try {
                memoryMessageStore.removeMessage(next.getMessageId());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        systemUsage.getMemoryUsage().increaseUsage(10240L);
        ActiveMQTextMessage message2 = getMessage(1);
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        memoryMessageStore.addMessage((ConnectionContext) null, message2);
        queueStorePrefetch.addMessageLast(message2);
        message2.decrementReferenceCount();
        boolean z = true;
        while (z) {
            if (queueStorePrefetch.hasNext()) {
                MessageReference next2 = queueStorePrefetch.next();
                LOG.info("Received message: {} with body: ({})", next2.getMessageId(), next2.getMessage().getText());
                queueStorePrefetch.remove();
                try {
                    memoryMessageStore.removeMessage(next2.getMessageId());
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                z = false;
            }
        }
    }

    private ActiveMQTextMessage getMessage(int i) throws Exception {
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        MessageId messageId = new MessageId("11111:22222:0:" + i);
        messageId.setBrokerSequenceId(i);
        messageId.setProducerSequenceId(i);
        activeMQTextMessage.setMessageId(messageId);
        activeMQTextMessage.setDestination(this.destination);
        activeMQTextMessage.setPersistent(true);
        activeMQTextMessage.setResponseRequired(true);
        activeMQTextMessage.setText("Msg:" + i + " " + this.text);
        Assert.assertEquals(activeMQTextMessage.getMessageId().getProducerSequenceId(), i);
        return activeMQTextMessage;
    }
}
