package org.apache.activemq.usecases;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.bugs.AMQ4607Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TopicBridgeSelectorConduitOnOff.class */
public class TopicBridgeSelectorConduitOnOff {
    private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeSelectorConduitOnOff.class);
    BrokerService brokerA;
    BrokerService brokerB;
    final int numProducers = 20;
    final int numConsumers = 20;
    final int numberOfMessagesToSendPerProducer = 5000;
    final ActiveMQTopic destination = new ActiveMQTopic("TOPIC");

    @After
    public void stopBrokers() throws Exception {
        this.brokerA.stop();
        this.brokerB.stop();
    }

    @Test
    public void testForwardsWithConduitSubsTrue() throws Exception {
        doTestWithConduit(true);
    }

    @Test
    public void testForwardsWithConduitSubsFalse() throws Exception {
        doTestWithConduit(false);
    }

    private void doTestWithConduit(boolean z) throws Exception {
        this.brokerA = newBroker("A");
        this.brokerB = newBroker("B");
        this.brokerB.start();
        NetworkConnector bridgeBrokers = bridgeBrokers(this.brokerA, this.brokerB, z);
        this.brokerA.start();
        while (bridgeBrokers.activeBridges().size() == 0) {
            LOG.info("num bridges: " + bridgeBrokers.activeBridges());
            TimeUnit.SECONDS.sleep(1L);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1000000);
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        final ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(this.brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final List synchronizedList = Collections.synchronizedList(new LinkedList());
        final CountDownLatch countDownLatch2 = new CountDownLatch(20);
        for (int i = 0; i < 20; i++) {
            final int i2 = i;
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.usecases.TopicBridgeSelectorConduitOnOff.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection = activeMQConnectionFactory2.createConnection();
                        createConnection.start();
                        createConnection.createSession(false, 1).createConsumer(TopicBridgeSelectorConduitOnOff.this.destination, i2 % 2 == 0 ? "COLOUR = 'RED'" : "COLOUR = 'BLUE'").setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.TopicBridgeSelectorConduitOnOff.1.1
                            public void onMessage(Message message) {
                                int incrementAndGet = atomicInteger.incrementAndGet();
                                countDownLatch.countDown();
                                if (incrementAndGet % AMQ4607Test.TIMEOUT == 0) {
                                    try {
                                        TopicBridgeSelectorConduitOnOff.LOG.info("Consumer id: " + i2 + ", message COLOUR:" + message.getStringProperty("COLOUR"));
                                    } catch (JMSException e) {
                                        e.printStackTrace();
                                    }
                                }
                            }
                        });
                        synchronizedList.add(createConnection);
                        countDownLatch2.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        countDownLatch2.await(20L, TimeUnit.SECONDS);
        Topic destination = this.brokerA.getDestination(this.destination);
        LOG.info("Num consumers: " + destination.getConsumers().size());
        while (true) {
            if (destination.getConsumers().size() == (z ? 1 : 20)) {
                break;
            }
            LOG.info("Num consumers: " + destination.getConsumers().size());
            TimeUnit.SECONDS.sleep(1L);
        }
        ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(20);
        for (int i3 = 0; i3 < 20; i3++) {
            newFixedThreadPool2.execute(new Runnable() { // from class: org.apache.activemq.usecases.TopicBridgeSelectorConduitOnOff.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection = activeMQConnectionFactory.createConnection();
                        createConnection.start();
                        Session createSession = createConnection.createSession(false, 1);
                        MessageProducer createProducer = createSession.createProducer(TopicBridgeSelectorConduitOnOff.this.destination);
                        createProducer.setDeliveryMode(1);
                        for (int i4 = 0; i4 < 5000; i4++) {
                            BytesMessage createBytesMessage = createSession.createBytesMessage();
                            createBytesMessage.setStringProperty("COLOUR", i4 % 2 == 0 ? "RED" : "BLUE");
                            createProducer.send(createBytesMessage);
                        }
                        createConnection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        newFixedThreadPool2.shutdown();
        newFixedThreadPool2.awaitTermination(5L, TimeUnit.MINUTES);
        long currentTimeMillis = System.currentTimeMillis();
        countDownLatch.await(5L, TimeUnit.MINUTES);
        LOG.info("Duration to Receive after producers complete: " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
        Logger logger = LOG;
        long count = destination.getDestinationStatistics().getEnqueues().getCount();
        int i4 = atomicInteger.get();
        destination.getDestinationStatistics().getForwards().getCount();
        logger.info("Topic enqueues: " + count + ", Total received: " + logger + ", forwards: " + i4);
        TimeUnit.SECONDS.sleep(10L);
        Logger logger2 = LOG;
        long count2 = destination.getDestinationStatistics().getEnqueues().getCount();
        int i5 = atomicInteger.get();
        destination.getDestinationStatistics().getForwards().getCount();
        logger2.info("Topic enqueues: " + count2 + ", Total received: " + logger2 + ", forwards: " + i5);
        Iterator it = synchronizedList.iterator();
        while (it.hasNext()) {
            try {
                ((Connection) it.next()).close();
            } catch (Exception e) {
            }
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
    }

    private BrokerService newBroker(String str) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setBrokerName(str);
        brokerService.addConnector("tcp://0.0.0.0:0");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setEnableAudit(true);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        return brokerService;
    }

    protected NetworkConnector bridgeBrokers(BrokerService brokerService, BrokerService brokerService2, boolean z) throws Exception {
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + brokerService2.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")"));
        discoveryNetworkConnector.setName(brokerService.getBrokerName() + "-to-" + brokerService2.getBrokerName());
        discoveryNetworkConnector.setConduitSubscriptions(z);
        brokerService.addNetworkConnector(discoveryNetworkConnector);
        LOG.info("Bridging with conduit subs:" + z);
        return discoveryNetworkConnector;
    }
}
