package org.apache.activemq.usecases;

import jakarta.jms.MessageConsumer;
import java.lang.Thread;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.class */
public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements Thread.UncaughtExceptionHandler {
    public static final int BROKER_COUNT = 3;
    public static final int CONSUMER_COUNT = 5;
    public static final int MESSAGE_COUNT = 0;
    public static final boolean DUPLEX = false;
    public static final boolean CONDUIT = true;
    public static final int NETWORK_TTL = 6;
    private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
    public static final int TIMEOUT = 30000;
    protected Map<String, MessageConsumer> consumerMap;
    Map<Thread, Throwable> unhandledExceptions = new HashMap();

    private void assertNoUnhandledExceptions() {
        for (Map.Entry<Thread, Throwable> entry : this.unhandledExceptions.entrySet()) {
            LOG.error("Thread:" + entry.getKey() + " Had unexpected: " + entry.getValue());
        }
        assertTrue("There are no unhandled exceptions, see: log for detail on: " + this.unhandledExceptions, this.unhandledExceptions.isEmpty());
    }

    public NetworkConnector bridge(String str, String str2, boolean z) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2, true, 6, true);
        bridgeBrokers.setSuppressDuplicateQueueSubscriptions(true);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(true);
        bridgeBrokers.setDuplex(false);
        bridgeBrokers.setConduitNetworkQueueSubscriptions(z);
        return bridgeBrokers;
    }

    public NetworkConnector bridge(String str, String str2) throws Exception {
        return bridge(str, str2, false);
    }

    public void testConsumerOnEachBroker() throws Exception {
        bridge("Broker0", "Broker1");
        bridge("Broker1", "Broker0");
        bridge("Broker1", "Broker2");
        bridge("Broker2", "Broker1");
        startAllBrokers();
        waitForBridgeFormation(this.brokers.get("Broker0").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker2").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 1);
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        for (int i = 0; i < 3; i++) {
            this.consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, createDestination));
        }
        assertExactConsumersConnect("Broker0", 3, 1, 30000L);
        assertExactConsumersConnect("Broker2", 3, 1, 30000L);
        assertExactConsumersConnect("Broker1", 3, 1, 30000L);
        assertNoUnhandledExceptions();
        LOG.info("Complete the mesh - 0->2");
        NetworkConnector bridge = bridge("Broker0", "Broker2");
        bridge.setBrokerName("Broker0");
        bridge.start();
        LOG.info("... complete the mesh - 2->0");
        NetworkConnector bridge2 = bridge("Broker2", "Broker0");
        bridge2.setBrokerName("Broker2");
        bridge2.start();
        for (int i2 = 0; i2 < 3; i2++) {
            assertExactConsumersConnect("Broker" + i2, 3, 1, 30000L);
        }
        this.consumerMap.get("Consumer:2:0").close();
        TimeUnit.SECONDS.sleep(1L);
        this.consumerMap.get("Consumer:1:0").close();
        TimeUnit.SECONDS.sleep(1L);
        this.consumerMap.get("Consumer:0:0").close();
        LOG.info("Check for no consumers..");
        for (int i3 = 0; i3 < 3; i3++) {
            assertExactConsumersConnect("Broker" + i3, 0, 0, 30000L);
        }
    }

    public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
        bridge("Broker0", "Broker1", true);
        bridge("Broker1", "Broker0", true);
        bridge("Broker1", "Broker2", true);
        bridge("Broker2", "Broker1", true);
        startAllBrokers();
        waitForBridgeFormation(this.brokers.get("Broker0").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker2").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 1);
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        for (int i = 0; i < 3; i++) {
            this.consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, createDestination));
        }
        assertExactConsumersConnect("Broker0", 2, 1, 30000L);
        assertExactConsumersConnect("Broker2", 2, 1, 30000L);
        assertExactConsumersConnect("Broker1", 3, 1, 30000L);
        assertNoUnhandledExceptions();
        LOG.info("Complete the mesh - 0->2");
        NetworkConnector bridge = bridge("Broker0", "Broker2");
        bridge.setBrokerName("Broker0");
        bridge.start();
        LOG.info("... complete the mesh - 2->0");
        NetworkConnector bridge2 = bridge("Broker2", "Broker0");
        bridge2.setBrokerName("Broker2");
        bridge2.start();
        this.consumerMap.get("Consumer:2:0").close();
        TimeUnit.SECONDS.sleep(1L);
        this.consumerMap.get("Consumer:1:0").close();
        TimeUnit.SECONDS.sleep(1L);
        this.consumerMap.get("Consumer:0:0").close();
        LOG.info("Check for no consumers..");
        for (int i2 = 0; i2 < 3; i2++) {
            assertExactConsumersConnect("Broker" + i2, 0, 0, 30000L);
        }
    }

    public void testXConsumerOnEachBroker() throws Exception {
        bridge("Broker0", "Broker1");
        bridge("Broker1", "Broker0");
        bridge("Broker1", "Broker2");
        bridge("Broker2", "Broker1");
        startAllBrokers();
        waitForBridgeFormation(this.brokers.get("Broker0").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker2").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 0);
        waitForBridgeFormation(this.brokers.get("Broker1").broker, 1, 1);
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        for (int i = 0; i < 3; i++) {
            for (int i2 = 0; i2 < 5; i2++) {
                this.consumerMap.put("Consumer:" + i + ":" + i2, createConsumer("Broker" + i, createDestination));
            }
        }
        for (int i3 = 0; i3 < 3; i3++) {
            assertExactConsumersConnect("Broker" + i3, 7, 1, 30000L);
        }
        assertNoUnhandledExceptions();
        LOG.info("Complete the mesh - 0->2");
        NetworkConnector bridge = bridge("Broker0", "Broker2");
        bridge.setBrokerName("Broker0");
        bridge.start();
        waitForBridgeFormation(this.brokers.get("Broker0").broker, 1, 1);
        LOG.info("... complete the mesh - 2->0");
        NetworkConnector bridge2 = bridge("Broker2", "Broker0");
        bridge2.setBrokerName("Broker2");
        bridge2.start();
        waitForBridgeFormation(this.brokers.get("Broker2").broker, 1, 1);
        for (int i4 = 0; i4 < 3; i4++) {
            assertExactConsumersConnect("Broker" + i4, 7, 1, 30000L);
        }
        for (int i5 = 0; i5 < 5; i5++) {
            this.consumerMap.get("Consumer:2:" + i5).close();
            TimeUnit.SECONDS.sleep(1L);
            this.consumerMap.get("Consumer:1:" + i5).close();
            TimeUnit.SECONDS.sleep(1L);
            this.consumerMap.get("Consumer:0:" + i5).close();
        }
        LOG.info("Check for no consumers..");
        for (int i6 = 0; i6 < 3; i6++) {
            assertExactConsumersConnect("Broker" + i6, 0, 0, 30000L);
        }
    }

    protected void assertExactConsumersConnect(final String str, final int i, final int i2, long j) throws Exception {
        final ManagementContext managementContext = this.brokers.get(str).broker.getManagementContext();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        assertTrue("Expected consumers count: " + i + " on: " + str, Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.VerifyNetworkConsumersDisconnectTest.1
            public boolean isSatisified() throws Exception {
                try {
                    QueueViewMBean queueViewMBean = (QueueViewMBean) managementContext.newProxyInstance(((JmsMultipleBrokersTestSupport.BrokerItem) VerifyNetworkConsumersDisconnectTest.this.brokers.get(str)).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
                    long consumerCount = queueViewMBean.getConsumerCount();
                    VerifyNetworkConsumersDisconnectTest.LOG.info("On " + str + " current consumer count for " + queueViewMBean + ", " + consumerCount);
                    LinkedList linkedList = new LinkedList();
                    for (ObjectName objectName : queueViewMBean.getSubscriptions()) {
                        linkedList.add(objectName.getKeyProperty("consumerId"));
                    }
                    VerifyNetworkConsumersDisconnectTest.LOG.info("Sub IDs: " + linkedList);
                    if (consumerCount == i) {
                        atomicInteger.incrementAndGet();
                    } else {
                        atomicInteger.set(0);
                    }
                    return atomicInteger.get() > i2;
                } catch (Exception e) {
                    VerifyNetworkConsumersDisconnectTest.LOG.warn(": ", e);
                    return false;
                }
            }
        }, j));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.unhandledExceptions.clear();
        Thread.setDefaultUncaughtExceptionHandler(this);
        for (int i = 0; i < 3; i++) {
            createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
        }
        this.consumerMap = new LinkedHashMap();
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    protected void configureBroker(BrokerService brokerService) {
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        synchronized (this.unhandledExceptions) {
            this.unhandledExceptions.put(thread, th);
        }
    }
}
