package org.apache.activemq.usecases;

import jakarta.jms.MessageConsumer;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;

/* loaded from: input_file:org/apache/activemq/usecases/TwoBrokerVirtualTopicForwardingTest.class */
public class TwoBrokerVirtualTopicForwardingTest extends JmsMultipleBrokersTestSupport {
    public void testBridgeVirtualTopicQueues() throws Exception {
        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        Thread.sleep(500L);
        assertEquals(1, org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic")).getConsumers().size());
        assertEquals(1, org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")).getConsumers().size());
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, activeMQTopic));
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerB").broker, activeMQTopic));
        sendMessages("BrokerA", activeMQTopic, 1);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        consumerMessages.waitForMessagesToArrive(1);
        consumerMessages2.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        assertEquals(1, consumerMessages.getMessageCount());
        assertEquals(1, consumerMessages2.getMessageCount());
    }

    public void testDontBridgeQueuesWithOnlyQueueConsumers() throws Exception {
        dontBridgeVirtualTopicConsumerQueues("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        Thread.sleep(500L);
        assertEquals(1, org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic")).getConsumers().size());
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")));
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, activeMQTopic));
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerB").broker, activeMQTopic));
        sendMessages("BrokerA", activeMQTopic, 1);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        consumerMessages.waitForMessagesToArrive(1);
        consumerMessages2.waitForMessagesToArrive(0);
        Thread.sleep(2000L);
        assertEquals(1, consumerMessages.getMessageCount());
        assertEquals(0, consumerMessages2.getMessageCount());
    }

    public void testDontBridgeQueuesWithBothTypesConsumers() throws Exception {
        dontBridgeVirtualTopicConsumerQueues("BrokerA", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        createConsumer("BrokerB", createDestination("VirtualTopic.tempTopic", true));
        Thread.sleep(500L);
        assertEquals(1, org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic")).getConsumers().size());
        assertNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")));
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        assertNotNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerA").broker, activeMQTopic));
        assertNotNull(org.apache.activemq.TestSupport.getDestination(this.brokers.get("BrokerB").broker, activeMQTopic));
        sendMessages("BrokerA", activeMQTopic, 1);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        consumerMessages.waitForMessagesToArrive(1);
        consumerMessages2.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        assertEquals(1, consumerMessages.getMessageCount());
        assertEquals(1, consumerMessages2.getMessageCount());
    }

    private void bridgeAndConfigureBrokers(String str, String str2) throws Exception {
        bridgeBrokers(str, str2).setDecreaseNetworkConsumerPriority(true);
    }

    private void dontBridgeVirtualTopicConsumerQueues(String str, String str2) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(true);
        LinkedList linkedList = new LinkedList();
        linkedList.add(new ActiveMQQueue("Consumer.*.VirtualTopic.>"));
        bridgeBrokers.setExcludedDestinations(linkedList);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String str = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + str));
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + str));
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService createBroker = createBroker(uri);
        configurePersistenceAdapter(createBroker);
        return createBroker;
    }

    protected void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
        File file = new File("target/test-amq-data/kahadb/" + brokerService.getBrokerName());
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(file);
        brokerService.setPersistenceAdapter(kahaDBStore);
    }
}
