package org.apache.activemq.usecases;

import jakarta.jms.Destination;
import java.io.IOException;
import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;

/* loaded from: input_file:org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.class */
public class NetworkBridgeProducerFlowControlTest extends JmsMultipleBrokersTestSupport {
    private static final long MAX_TEST_TIME = 120000;
    private static final Log LOG = LogFactory.getLog(NetworkBridgeProducerFlowControlTest.class);
    public boolean persistentTestMessages;
    public boolean networkIsAlwaysSendSync;
    private Vector<Throwable> exceptions = new Vector<>();

    /* loaded from: input_file:org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest$DummySimpleDiscoveryAgent.class */
    class DummySimpleDiscoveryAgent extends SimpleDiscoveryAgent {
        boolean isServiceFailed = false;

        DummySimpleDiscoveryAgent() {
        }

        public void serviceFailed(DiscoveryEvent discoveryEvent) throws IOException {
            NetworkBridgeProducerFlowControlTest.LOG.info("!!!!! DummySimpleDiscoveryAgent.serviceFailed() invoked with event:" + discoveryEvent + "!!!!!!");
            this.isServiceFailed = true;
            super.serviceFailed(discoveryEvent);
        }
    }

    public static Test suite() {
        return suite(NetworkBridgeProducerFlowControlTest.class);
    }

    public void initCombosForTestFastAndSlowRemoteConsumers() {
        addCombinationValues("persistentTestMessages", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("networkIsAlwaysSendSync", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        setMaxTestTime(MAX_TEST_TIME);
        super.setUp();
    }

    public void testFastAndSlowRemoteConsumers() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1");
        ActiveMQQueue activeMQQueue2 = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1");
        createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService createBroker = createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(activeMQQueue, policyEntry);
        createBroker.setDestinationPolicy(policyMap);
        NetworkConnector bridgeBrokers = bridgeBrokers("broker0", "broker1");
        bridgeBrokers.setAlwaysSyncSend(this.networkIsAlwaysSendSync);
        bridgeBrokers.setPrefetchSize(1);
        startAllBrokers();
        waitForBridgeFormation();
        this.persistentDelivery = this.persistentTestMessages;
        sendMessages("broker0", activeMQQueue2, 100);
        sendMessages("broker0", activeMQQueue, 100);
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        final long currentTimeMillis = System.currentTimeMillis();
        final AtomicLong atomicLong = new AtomicLong();
        final AtomicLong atomicLong2 = new AtomicLong();
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                    atomicLong.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch2.await();
                    atomicLong2.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        thread.start();
        thread2.start();
        createConsumer("broker1", (Destination) activeMQQueue2, countDownLatch);
        this.brokers.get("broker1").consumers.get(createConsumer("broker1", (Destination) activeMQQueue, countDownLatch2)).setProcessingDelay(100L);
        thread.join();
        thread2.join();
        assertTrue("no exceptions on the wait threads:" + this.exceptions, this.exceptions.isEmpty());
        LOG.info("Fast consumer duration (ms): " + atomicLong.get());
        LOG.info("Slow consumer duration (ms): " + atomicLong2.get());
        if (this.networkIsAlwaysSendSync) {
            Assert.assertTrue(atomicLong.get() < atomicLong2.get() / 20);
        } else {
            Assert.assertEquals(Boolean.valueOf(this.persistentTestMessages), Boolean.valueOf(atomicLong.get() < atomicLong2.get() / 10));
        }
    }

    public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception {
        doTestSendFailIfNoSpaceDoesNotBlockNetwork(new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1"), new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1"));
    }

    public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception {
        doTestSendFailIfNoSpaceDoesNotBlockNetwork(new ActiveMQTopic(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1"), new ActiveMQTopic(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1"));
    }

    public void doTestSendFailIfNoSpaceDoesNotBlockNetwork(ActiveMQDestination activeMQDestination, ActiveMQDestination activeMQDestination2) throws Exception {
        createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService createBroker = createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        createBroker.getSystemUsage().setSendFailIfNoSpace(true);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(activeMQDestination, policyEntry);
        createBroker.setDestinationPolicy(policyMap);
        NetworkConnector bridgeBrokers = bridgeBrokers("broker0", "broker1");
        bridgeBrokers.setAlwaysSyncSend(true);
        bridgeBrokers.setPrefetchSize(1);
        startAllBrokers();
        waitForBridgeFormation();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        final long currentTimeMillis = System.currentTimeMillis();
        final AtomicLong atomicLong = new AtomicLong();
        final AtomicLong atomicLong2 = new AtomicLong();
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                    atomicLong.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch2.await();
                    atomicLong2.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        thread.start();
        thread2.start();
        createConsumer("broker1", (Destination) activeMQDestination2, countDownLatch);
        this.brokers.get("broker1").consumers.get(createConsumer("broker1", (Destination) activeMQDestination, countDownLatch2)).setProcessingDelay(100L);
        this.persistentDelivery = false;
        sendMessages("broker0", activeMQDestination2, 100);
        sendMessages("broker0", activeMQDestination, 100);
        thread.join(TimeUnit.SECONDS.toMillis(60L));
        thread2.join(TimeUnit.SECONDS.toMillis(60L));
        assertTrue("no exceptions on the wait threads:" + this.exceptions, this.exceptions.isEmpty());
        LOG.info("Fast consumer duration (ms): " + atomicLong.get());
        LOG.info("Slow consumer duration (ms): " + atomicLong2.get());
        assertTrue("fast time set", atomicLong.get() > 0);
        assertTrue("slow time set", atomicLong2.get() > 0);
        Assert.assertTrue(atomicLong.get() < atomicLong2.get() / 10);
    }

    public void testSendFailIfNoSpaceReverseDoesNotBlockQueueNetwork() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1");
        ActiveMQQueue activeMQQueue2 = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1");
        BrokerService createBroker = createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        createBroker.getSystemUsage().setSendFailIfNoSpace(true);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(activeMQQueue, policyEntry);
        createBroker.setDestinationPolicy(policyMap);
        NetworkConnector bridgeBrokers = bridgeBrokers("broker0", "broker1");
        bridgeBrokers.setAlwaysSyncSend(true);
        bridgeBrokers.setPrefetchSize(1);
        bridgeBrokers.setDuplex(true);
        startAllBrokers();
        waitForBridgeFormation();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        final long currentTimeMillis = System.currentTimeMillis();
        final AtomicLong atomicLong = new AtomicLong();
        final AtomicLong atomicLong2 = new AtomicLong();
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                    atomicLong.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.NetworkBridgeProducerFlowControlTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    countDownLatch2.await();
                    atomicLong2.set(System.currentTimeMillis() - currentTimeMillis);
                } catch (InterruptedException e) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(e);
                    Assert.fail(e.getMessage());
                }
            }
        };
        thread.start();
        thread2.start();
        createConsumer("broker0", (Destination) activeMQQueue2, countDownLatch);
        this.brokers.get("broker0").consumers.get(createConsumer("broker0", (Destination) activeMQQueue, countDownLatch2)).setProcessingDelay(100L);
        this.persistentDelivery = false;
        sendMessages("broker1", activeMQQueue2, 100);
        sendMessages("broker1", activeMQQueue, 100);
        thread.join(TimeUnit.SECONDS.toMillis(60L));
        thread2.join(TimeUnit.SECONDS.toMillis(60L));
        assertTrue("no exceptions on the wait threads:" + this.exceptions, this.exceptions.isEmpty());
        LOG.info("Fast consumer duration (ms): " + atomicLong.get());
        LOG.info("Slow consumer duration (ms): " + atomicLong2.get());
        assertTrue("fast time set", atomicLong.get() > 0);
        assertTrue("slow time set", atomicLong2.get() > 0);
        Assert.assertTrue(atomicLong.get() < atomicLong2.get() / 10);
    }

    public void testDuplexSendFailIfNoSpaceDoesNotBlockNetwork() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".duplexTest?consumer.prefetchSize=1");
        BrokerService createBroker = createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        createBroker.getSystemUsage().setSendFailIfNoSpace(true);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(activeMQTopic, policyEntry);
        createBroker.setDestinationPolicy(policyMap);
        DiscoveryNetworkConnector bridgeBrokers = bridgeBrokers("broker0", "broker1");
        URI uri = bridgeBrokers.getUri();
        bridgeBrokers.setAlwaysSyncSend(true);
        bridgeBrokers.setPrefetchSize(1);
        bridgeBrokers.setDuplex(true);
        DummySimpleDiscoveryAgent dummySimpleDiscoveryAgent = new DummySimpleDiscoveryAgent();
        dummySimpleDiscoveryAgent.setServices(uri.toString().substring(8, uri.toString().lastIndexOf(41)));
        bridgeBrokers.setDiscoveryAgent(dummySimpleDiscoveryAgent);
        startAllBrokers();
        waitForBridgeFormation();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        this.brokers.get("broker0").consumers.get(createConsumer("broker0", (Destination) activeMQTopic, countDownLatch)).setProcessingDelay(100L);
        this.persistentDelivery = false;
        sendMessages("broker1", activeMQTopic, 100);
        countDownLatch.await(5L, TimeUnit.SECONDS);
        assertFalse("dummySimpleDiscoveryAgent.serviceFail has been invoked - should not have been", dummySimpleDiscoveryAgent.isServiceFailed);
    }
}
