package org.apache.activemq.transport.failover;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverDuplicateTest.class */
public class FailoverDuplicateTest extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class);
    private static final String QUEUE_NAME = "TestQueue";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;

    public void tearDown() throws Exception {
        stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public void startBroker(boolean z, String str) throws Exception {
        this.broker = createBroker(z, str);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        activeMQConnectionFactory.setAuditMaximumProducerNumber(2048);
        activeMQConnectionFactory.setOptimizeAcknowledge(true);
    }

    public void testFailoverSendReplyLost() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.1
            public void send(final ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                if (atomicBoolean.compareAndSet(false, true)) {
                    producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                FailoverDuplicateTest.LOG.info("Waiting for recepit");
                                TestCase.assertTrue("message received on time", countDownLatch.await(60L, TimeUnit.SECONDS));
                                TestCase.assertTrue("new producers done on time", countDownLatch2.await(120L, TimeUnit.SECONDS));
                                FailoverDuplicateTest.LOG.info("Stopping connection post send and receive and multiple producers");
                                producerBrokerExchange.getConnectionContext().getConnection().stop();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.watchTopicAdvisories=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue(QUEUE_NAME);
        final AtomicInteger atomicInteger = new AtomicInteger();
        MessageListener messageListener = new MessageListener() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.2
            public void onMessage(javax.jms.Message message) {
                countDownLatch.countDown();
                atomicInteger.incrementAndGet();
            }
        };
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createConsumer(createQueue).setMessageListener(messageListener);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.3
            @Override // java.lang.Runnable
            public void run() {
                FailoverDuplicateTest.LOG.info("doing async send...");
                try {
                    FailoverDuplicateTest.this.produceMessage(createSession, createQueue, "will resend", 1);
                } catch (JMSException e) {
                    FailoverDuplicateTest.LOG.error("got send exception: ", e);
                    TestCase.fail("got unexpected send exception" + e);
                }
                countDownLatch3.countDown();
                FailoverDuplicateTest.LOG.info("done async send");
            }
        });
        assertTrue("one message got through on time", countDownLatch.await(20L, TimeUnit.SECONDS));
        for (int i = 0; i < 1050; i++) {
            produceMessage(createSession2, createQueue, "new producer " + i, 2);
            if (i == 1025) {
                LOG.info("count down producers done");
                countDownLatch2.countDown();
            }
        }
        assertTrue("message sent complete through failover", countDownLatch3.await(30L, TimeUnit.SECONDS));
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.4
            public boolean isSatisified() throws Exception {
                FailoverDuplicateTest.LOG.info("received count:" + atomicInteger.get());
                return 2101 <= atomicInteger.get();
            }
        });
        assertEquals("we got all produced messages", 2101, atomicInteger.get());
        createConnection.close();
        createConnection2.close();
        assertEquals("expect all messages are dequeued with one duplicate to dlq", 2103L, this.broker.getRegionBroker().getDestinationStatistics().getEnqueues().getCount());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverDuplicateTest.5
            public boolean isSatisified() throws Exception {
                FailoverDuplicateTest.LOG.info("dequeues : " + FailoverDuplicateTest.this.broker.getRegionBroker().getDestinationStatistics().getDequeues().getCount());
                return 2102 <= FailoverDuplicateTest.this.broker.getRegionBroker().getDestinationStatistics().getDequeues().getCount();
            }
        });
        assertEquals("dequeue correct, including duplicate dispatch poisoned", 2102L, this.broker.getRegionBroker().getDestinationStatistics().getDequeues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with second restart..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection3 = activeMQConnectionFactory2.createConnection();
        createConnection3.start();
        MessageConsumer createConsumer = createConnection3.createSession(false, 1).createConsumer(createQueue);
        javax.jms.Message receive = createConsumer.receive(1000L);
        if (receive == null) {
            receive = createConsumer.receive(5000L);
        }
        assertNull("no messges left dangling but got: " + receive, receive);
        createConnection3.close();
    }

    private void produceMessage(Session session, Queue queue, String str, int i) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(session.createTextMessage(str + ", count:" + i2));
        }
        createProducer.close();
    }
}
