package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.util.TestUtils;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/DuplexStartNpeTest.class */
public class DuplexStartNpeTest {
    final ActiveMQQueue dest = new ActiveMQQueue("QQ");
    final List<BrokerService> brokerServices = new ArrayList();
    final List<Connection> connections = new ArrayList();
    static final int NUM_MESSAGES = 10;
    private static final Logger LOG = LoggerFactory.getLogger(DuplexStartNpeTest.class);
    static final String urlString = "tcp://localhost:" + TestUtils.findOpenPort();

    @Test
    public void reproduceNpe() throws Exception {
        BrokerService createBroker = createBroker();
        NetworkConnector addNetworkConnector = createBroker.addNetworkConnector("masterslave:(" + urlString + "," + urlString + ")");
        addNetworkConnector.setDuplex(true);
        addNetworkConnector.setStaticBridge(true);
        addNetworkConnector.setStaticallyIncludedDestinations(Arrays.asList(this.dest));
        createBroker.start();
        publish(createBroker.getVmConnectorURI());
        BrokerService createBroker2 = createBroker();
        createBroker2.addConnector(urlString);
        createBroker2.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.network.DuplexStartNpeTest.1
            public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception {
                super.addConnection(connectionContext, connectionInfo);
                if (connectionInfo.getClientId() == null || !connectionInfo.getClientId().contains("_duplex_")) {
                    return;
                }
                DuplexStartNpeTest.LOG.info("New connection for broker1: " + connectionInfo);
                TimeUnit.MILLISECONDS.sleep(500L);
            }
        }});
        createBroker2.start();
        consume(new URI(urlString));
    }

    private void consume(URI uri) throws Exception {
        MessageConsumer createConsumer = connectionFactory(uri).createConnection().createSession(false, 1).createConsumer(this.dest);
        for (int i = 0; i < 10; i++) {
            TestCase.assertNotNull("got message: " + i, createConsumer.receive(5000L));
        }
    }

    private void publish(URI uri) throws Exception {
        MessageProducer createProducer = connectionFactory(uri).createConnection().createSession(false, 1).createProducer(this.dest);
        for (int i = 0; i < 10; i++) {
            createProducer.send(new ActiveMQTextMessage());
        }
    }

    private ActiveMQConnectionFactory connectionFactory(URI uri) {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(uri) { // from class: org.apache.activemq.network.DuplexStartNpeTest.2
            public Connection createConnection() throws JMSException {
                Connection createConnection = super.createConnection();
                DuplexStartNpeTest.this.connections.add(createConnection);
                createConnection.start();
                return createConnection;
            }
        };
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    private BrokerService createBroker() {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("B" + this.brokerServices.size());
        brokerService.setBrokerId(brokerService.getBrokerName());
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setAdvisorySupport(false);
        this.brokerServices.add(brokerService);
        return brokerService;
    }

    @After
    public void tearDown() throws Exception {
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
        }
        this.connections.clear();
        Iterator<BrokerService> it2 = this.brokerServices.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().stop();
            } catch (Exception e2) {
            }
        }
        this.brokerServices.clear();
    }
}
