package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transport.nio.NIOSSLLoadTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.class */
public class QueueDuplicatesFromStoreTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(QueueDuplicatesFromStoreTest.class);
    BrokerService brokerService;
    static final String mesageIdRoot = "11111:22222:";
    ActiveMQQueue destination = new ActiveMQQueue("queue-" + QueueDuplicatesFromStoreTest.class.getSimpleName());
    final int messageBytesSize = 256;
    final String text = new String(new byte[256]);
    final int ackStartIndex = 100;
    final int ackWindow = 50;
    final int ackBatchSize = 50;
    final int fullWindow = 200;
    protected int count = 5000;

    public void setUp() throws Exception {
        this.brokerService = createBroker();
        this.brokerService.setUseJmx(false);
        this.brokerService.deleteAllMessages();
        this.brokerService.start();
    }

    protected BrokerService createBroker() throws Exception {
        return new BrokerService();
    }

    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
        doTestNoDuplicateAfterCacheFullAndAcked(10240);
    }

    public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
        doTestNoDuplicateAfterCacheFullAndAcked(512);
    }

    public void doTestNoDuplicateAfterCacheFullAndAcked(int i) throws Exception {
        PersistenceAdapter persistenceAdapter = this.brokerService.getPersistenceAdapter();
        MessageStore createQueueMessageStore = persistenceAdapter.createQueueMessageStore(this.destination);
        ConnectionContext connectionContext = new ConnectionContext();
        final ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, createQueueMessageStore, destinationStatistics, this.brokerService.getTaskRunnerFactory());
        queue.systemUsage.getMemoryUsage().setLimit(10485760L);
        queue.setMaxAuditDepth(i);
        queue.initialize();
        queue.start();
        ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
        producerBrokerExchange.setProducerState(new ProducerState(new ProducerInfo()));
        producerBrokerExchange.setConnectionContext(connectionContext);
        final CountDownLatch countDownLatch = new CountDownLatch(this.count);
        final AtomicLong atomicLong = new AtomicLong(0L);
        final AtomicLong atomicLong2 = new AtomicLong(0L);
        final Vector vector = new Vector();
        for (int i2 = 0; i2 < this.count; i2++) {
            queue.send(producerBrokerExchange, getMessage(i2));
        }
        assertEquals("store count is correct", this.count, createQueueMessageStore.getMessageCount());
        Subscription subscription = new Subscription() { // from class: org.apache.activemq.broker.region.QueueDuplicatesFromStoreTest.1
            public void add(MessageReference messageReference) throws Exception {
                if (atomicLong2.get() != messageReference.getMessageId().getProducerSequenceId()) {
                    vector.add("Not in sequence at: " + atomicLong2.get() + ", received: " + messageReference.getMessageId().getProducerSequenceId());
                }
                Assert.assertEquals("is in order", atomicLong2.get(), messageReference.getMessageId().getProducerSequenceId());
                countDownLatch.countDown();
                atomicLong2.incrementAndGet();
                messageReference.decrementReferenceCount();
            }

            public void add(ConnectionContext connectionContext2, Destination destination) throws Exception {
            }

            public int countBeforeFull() {
                if (isFull()) {
                    return 0;
                }
                return 200 - ((int) (atomicLong2.get() - atomicLong.get()));
            }

            public void destroy() {
            }

            public void gc() {
            }

            public ConsumerInfo getConsumerInfo() {
                return consumerInfo;
            }

            public ConnectionContext getContext() {
                return null;
            }

            public long getDequeueCounter() {
                return 0L;
            }

            public long getDispatchedCounter() {
                return 0L;
            }

            public int getDispatchedQueueSize() {
                return 0;
            }

            public long getEnqueueCounter() {
                return 0L;
            }

            public int getInFlightSize() {
                return 0;
            }

            public int getInFlightUsage() {
                return 0;
            }

            public ObjectName getObjectName() {
                return null;
            }

            public int getPendingQueueSize() {
                return 0;
            }

            public int getPrefetchSize() {
                return 0;
            }

            public String getSelector() {
                return null;
            }

            public boolean isBrowser() {
                return false;
            }

            public boolean isFull() {
                return atomicLong2.get() - atomicLong.get() >= 200;
            }

            public boolean isHighWaterMark() {
                return false;
            }

            public boolean isLowWaterMark() {
                return false;
            }

            public boolean isRecoveryRequired() {
                return false;
            }

            public boolean isSlave() {
                return false;
            }

            public boolean matches(MessageReference messageReference, MessageEvaluationContext messageEvaluationContext) throws IOException {
                return true;
            }

            public boolean matches(ActiveMQDestination activeMQDestination) {
                return true;
            }

            public void processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
            }

            public Response pullMessage(ConnectionContext connectionContext2, MessagePull messagePull) throws Exception {
                return null;
            }

            public List<MessageReference> remove(ConnectionContext connectionContext2, Destination destination) throws Exception {
                return null;
            }

            public void setObjectName(ObjectName objectName) {
            }

            public void setSelector(String str) throws InvalidSelectorException, UnsupportedOperationException {
            }

            public void updateConsumerPrefetch(int i3) {
            }

            public boolean addRecoveredMessage(ConnectionContext connectionContext2, MessageReference messageReference) throws Exception {
                return false;
            }

            public ActiveMQDestination getActiveMQDestination() {
                return QueueDuplicatesFromStoreTest.this.destination;
            }

            public void acknowledge(ConnectionContext connectionContext2, MessageAck messageAck) throws Exception {
            }

            public int getCursorMemoryHighWaterMark() {
                return 0;
            }

            public void setCursorMemoryHighWaterMark(int i3) {
            }

            public boolean isSlowConsumer() {
                return false;
            }

            public void unmatched(MessageReference messageReference) throws IOException {
            }
        };
        queue.addSubscription(connectionContext, subscription);
        int i3 = 0;
        do {
            long j = atomicLong2.get();
            if (j > 100 && j >= i3 + 50) {
                int i4 = 0;
                while (i4 < 50) {
                    atomicLong.incrementAndGet();
                    MessageAck messageAck = new MessageAck();
                    messageAck.setLastMessageId(new MessageId(mesageIdRoot + i3));
                    messageAck.setMessageCount(1);
                    queue.removeMessage(connectionContext, subscription, new IndirectMessageReference(getMessage(i3)), messageAck);
                    queue.wakeup();
                    i4++;
                    i3++;
                }
                if (i3 % NIOSSLLoadTest.MESSAGE_COUNT == 0) {
                    LOG.info("acked: " + i3);
                    persistenceAdapter.checkpoint(true);
                }
            }
            if (countDownLatch.await(0L, TimeUnit.MILLISECONDS)) {
                break;
            }
        } while (vector.isEmpty());
        assertTrue("There are no errors: " + vector, vector.isEmpty());
        assertEquals(this.count, atomicLong2.get());
        assertEquals("store count is correct", this.count - i3, createQueueMessageStore.getMessageCount());
    }

    private Message getMessage(int i) throws Exception {
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        activeMQTextMessage.setMessageId(new MessageId(mesageIdRoot + i));
        activeMQTextMessage.setDestination(this.destination);
        activeMQTextMessage.setPersistent(true);
        activeMQTextMessage.setResponseRequired(true);
        activeMQTextMessage.setText("Msg:" + i + " " + this.text);
        assertEquals(activeMQTextMessage.getMessageId().getProducerSequenceId(), i);
        return activeMQTextMessage;
    }
}
