package org.apache.activemq.broker.virtual;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Arrays;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/broker/virtual/VirtualTopicFlowControlDiscardTest.class */
public class VirtualTopicFlowControlDiscardTest {
    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFlowControlDiscardTest.class);
    final String payload = new String(new byte[155]);
    int numConsumers = 2;
    int total = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;

    @Parameterized.Parameter(0)
    public boolean concurrentSend;

    @Parameterized.Parameter(1)
    public boolean transactedSend;

    @Parameterized.Parameter(2)
    public boolean sendFailGlobal;

    @Parameterized.Parameter(3)
    public boolean persistentBroker;
    BrokerService brokerService;
    ConnectionFactory connectionFactory;

    @Before
    public void createBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(this.persistentBroker);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setCursorMemoryHighWaterMark(50);
        policyEntry.setMemoryLimit(5000L);
        policyEntry.setCursorMemoryHighWaterMark(110);
        if (this.sendFailGlobal) {
            this.brokerService.getSystemUsage().setSendFailIfNoSpace(true);
        } else {
            policyEntry.setSendFailIfNoSpace(true);
            policyEntry.setSendFailIfNoSpaceAfterTimeout(0L);
        }
        policyMap.put(new ActiveMQQueue("Consumer.0.VirtualTopic.TEST"), policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.start();
        for (VirtualDestinationInterceptor virtualDestinationInterceptor : this.brokerService.getDestinationInterceptors()) {
            for (VirtualTopic virtualTopic : virtualDestinationInterceptor.getVirtualDestinations()) {
                if (virtualTopic instanceof VirtualTopic) {
                    virtualTopic.setConcurrentSend(this.concurrentSend);
                    virtualTopic.setTransactedSend(this.transactedSend);
                    virtualTopic.setDropOnResourceLimit(true);
                }
            }
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.connectionFactory = activeMQConnectionFactory;
    }

    @After
    public void stopBroker() throws Exception {
        this.brokerService.stop();
    }

    @Parameterized.Parameters(name = "cS=#{0},tS=#{1},g=#{2},persist=#{3}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList(new Object[]{Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.TRUE}, new Object[]{Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE}, new Object[]{Boolean.FALSE, Boolean.TRUE, Boolean.TRUE, Boolean.FALSE}, new Object[]{Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE}, new Object[]{Boolean.FALSE, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE}, new Object[]{Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE}, new Object[]{Boolean.TRUE, Boolean.TRUE, Boolean.FALSE, Boolean.FALSE}, new Object[]{Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.TRUE});
    }

    @Test
    public void testFanoutWithResourceException() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        for (int i = 0; i < this.numConsumers; i++) {
            createSession.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
        }
        Connection createConnection2 = this.connectionFactory.createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Starting producer: " + currentTimeMillis);
        for (int i2 = 0; i2 < this.total; i2++) {
            createProducer.send(createSession2.createTextMessage(this.payload));
        }
        LOG.info("Done producer, duration: " + (System.currentTimeMillis() - currentTimeMillis));
        Destination destination = this.brokerService.getDestination(new ActiveMQQueue("Consumer.0.VirtualTopic.TEST"));
        LOG.info("Dest 0 size: " + destination.getDestinationStatistics().getEnqueues().getCount());
        Assert.assertTrue("did not get all", destination.getDestinationStatistics().getEnqueues().getCount() < ((long) this.total));
        Assert.assertTrue("got all", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.virtual.VirtualTopicFlowControlDiscardTest.1
            public boolean isSatisified() throws Exception {
                Destination destination2 = VirtualTopicFlowControlDiscardTest.this.brokerService.getDestination(new ActiveMQQueue("Consumer.1.VirtualTopic.TEST"));
                VirtualTopicFlowControlDiscardTest.LOG.info("Dest 1 size: " + destination2.getDestinationStatistics().getEnqueues().getCount());
                return ((long) VirtualTopicFlowControlDiscardTest.this.total) == destination2.getDestinationStatistics().getEnqueues().getCount();
            }
        }));
        try {
            createConnection.close();
        } catch (Exception e) {
        }
        try {
            createConnection2.close();
        } catch (Exception e2) {
        }
    }
}
