package org.apache.activemq.transport.failover;

import java.net.URI;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import junit.framework.Assert;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SocketProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTransactionTest.class */
public class FailoverTransactionTest extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
    private static final String QUEUE_NAME = "Failover.WithTx";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;

    public static Test suite() {
        return suite(FailoverTransactionTest.class);
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setMaxTestTime(1200000L);
        super.setAutoFail(true);
        super.setUp();
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private void startCleanBroker() throws Exception {
        startBroker(true);
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public void startBroker(boolean z, String str) throws Exception {
        this.broker = createBroker(z, str);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
    }

    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        assertNotNull("we got the message", createConsumer.receive(20000L));
        createSession.commit();
        createConnection.close();
    }

    public void initCombosForTestFailoverCommitReplyLost() {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.AMQ, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverCommitReplyLost() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                super.commitTransaction(connectionContext, transactionId, z);
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.2
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    Assert.assertTrue(e instanceof TransactionRolledBackException);
                    FailoverTransactionTest.LOG.info("got commit exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createSession.commit();
        createConsumer.close();
        createConnection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 1).createConsumer(createQueue);
        Message receive2 = createConsumer2.receive(1000L);
        if (receive2 == null) {
            receive2 = createConsumer2.receive(5000L);
        }
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                super.commitTransaction(connectionContext, transactionId, z);
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.4
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    Assert.assertTrue(e instanceof TransactionRolledBackException);
                    FailoverTransactionTest.LOG.info("got commit exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createSession.commit();
        createConsumer.close();
        createConnection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 1).createConsumer(createQueue);
        Message receive2 = createConsumer2.receive(1000L);
        if (receive2 == null) {
            receive2 = createConsumer2.receive(5000L);
        }
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
        for (ActiveMQDestination activeMQDestination : this.broker.getRegionBroker().getDestinations()) {
            LOG.info("Destinations list: " + activeMQDestination);
        }
        assertEquals("Only one destination", 1, this.broker.getRegionBroker().getDestinations().length);
    }

    public void initCombosForTestFailoverSendReplyLost() {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverSendReplyLost() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.5
            public void send(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.5.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post send...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.watchTopicAdvisories=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.6
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(createSession, createQueue);
                } catch (JMSException e) {
                    FailoverTransactionTest.LOG.error("got send exception: ", e);
                    Assert.fail("got unexpected send exception" + e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async send");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        LOG.info("restarting....");
        this.broker.start();
        assertTrue("message sent through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createConsumer.close();
        createConnection.close();
        assertEquals("no newly queued messages", 0L, this.broker.getRegionBroker().getDestinationStatistics().getEnqueues().getCount());
        assertEquals("1 dequeue", 1L, this.broker.getRegionBroker().getDestinationStatistics().getDequeues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with second restart..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 1).createConsumer(createQueue);
        Message receive2 = createConsumer2.receive(1000L);
        if (receive2 == null) {
            receive2 = createConsumer2.receive(5000L);
        }
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void initCombosForTestFailoverConnectionSendReplyLost() {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverConnectionSendReplyLost() throws Exception {
        this.broker = createBroker(true);
        KahaDBPersistenceAdapter defaultPersistenceAdapter = setDefaultPersistenceAdapter(this.broker);
        if (defaultPersistenceAdapter instanceof KahaDBPersistenceAdapter) {
            defaultPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        }
        final SocketProxy socketProxy = new SocketProxy();
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.7
            private boolean firstSend = true;

            public void send(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                if (this.firstSend) {
                    this.firstSend = false;
                    producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.7.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FailoverTransactionTest.LOG.info("Stopping connection post send...");
                            try {
                                socketProxy.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        socketProxy.setTarget(new URI(this.url));
        socketProxy.open();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + socketProxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.8
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(createSession, createQueue);
                } catch (JMSException e) {
                    FailoverTransactionTest.LOG.info("got send exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async send");
            }
        });
        assertTrue("proxy was closed", socketProxy.waitUntilClosed(30L));
        LOG.info("restarting proxy");
        socketProxy.open();
        assertTrue("message sent through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createConsumer.close();
        createConnection.close();
        assertEquals("one queued message", 1L, this.broker.getRegionBroker().getDestinationStatistics().getEnqueues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with restart..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 1).createConsumer(createQueue);
        Message receive2 = createConsumer2.receive(1000L);
        if (receive2 == null) {
            receive2 = createConsumer2.receive(5000L);
        }
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?trackTransactionProducers=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        assertNull("we got the message", createConsumer.receive(5000L));
        createSession.commit();
        createConnection.close();
    }

    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 10; i++) {
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.send(createSession.createTextMessage("Test message: 10"));
            createProducer.close();
        }
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        for (int i2 = 0; i2 < 10; i2++) {
            assertNotNull("we got all the message: 10", createConsumer.receive(20000L));
        }
        createSession.commit();
        createConnection.close();
    }

    public void testFailoverWithConnectionConsumer() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final Session createSession2 = createConnection.createSession(false, 1);
        createConnection.createConnectionConsumer(createQueue, (String) null, new ServerSessionPool() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.9
            public ServerSession getServerSession() throws JMSException {
                return new ServerSession() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.9.1
                    public Session getSession() throws JMSException {
                        return createSession2;
                    }

                    public void start() throws JMSException {
                        countDownLatch.countDown();
                        createSession2.run();
                    }
                };
            }
        }, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 10; i++) {
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.send(createSession.createTextMessage("Test message: 10"));
            createProducer.close();
        }
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        for (int i2 = 0; i2 < 9; i2++) {
            assertNotNull("Failed to get message: 10", createConsumer.receive(20000L));
        }
        createSession.commit();
        createConnection.close();
        assertTrue("connectionconsumer did not get a message", countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    public void testFailoverConsumerAckLost() throws Exception {
        for (int i = 0; i < 3; i++) {
            try {
                LOG.info("Iteration: " + i);
                doTestFailoverConsumerAckLost(i);
                stopBroker();
            } catch (Throwable th) {
                stopBroker();
                throw th;
            }
        }
    }

    public void doTestFailoverConsumerAckLost(final int i) throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.10
            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, final MessageAck messageAck) throws Exception {
                consumerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.10.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker on ack: " + messageAck);
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        Vector vector = new Vector();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        vector.add(createConnection);
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        vector.add(createConnection2);
        final Session createSession2 = createConnection2.createSession(true, 1);
        Connection createConnection3 = activeMQConnectionFactory.createConnection();
        createConnection3.start();
        vector.add(createConnection3);
        Session createSession3 = createConnection3.createSession(true, 1);
        final MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession3.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        produceMessage(createSession, createQueue);
        final Vector vector2 = new Vector();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.11
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit after consume...");
                try {
                    Message receive = createConsumer.receive(20000L);
                    FailoverTransactionTest.LOG.info("consumer1 first attempt got message: " + receive);
                    vector2.add(receive);
                    TimeUnit.SECONDS.sleep(i * 2);
                    Message receive2 = createConsumer.receive(5000L);
                    FailoverTransactionTest.LOG.info("consumer1 second attempt got message: " + receive2);
                    if (receive2 != null) {
                        vector2.add(receive2);
                    }
                    FailoverTransactionTest.LOG.info("committing consumer1 session: " + vector2.size() + " messsage(s)");
                    try {
                        createSession2.commit();
                    } catch (JMSException e) {
                        FailoverTransactionTest.LOG.info("got exception ex on commit", e);
                        if (!(e instanceof TransactionRolledBackException)) {
                            throw e;
                        }
                        atomicBoolean.set(true);
                    }
                    countDownLatch.countDown();
                    FailoverTransactionTest.LOG.info("done async commit");
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        LOG.info("received message count: " + vector2.size());
        Message receive = createConsumer.receive(atomicBoolean.get() ? 5000L : 20000L);
        LOG.info("post: from consumer1 received: " + receive);
        if (atomicBoolean.get()) {
            assertNotNull("should be available again after commit rollback ex", receive);
        } else {
            assertNull("should be nothing left for consumer as recieve should have committed", receive);
        }
        createSession2.commit();
        if (atomicBoolean.get() || (!atomicBoolean.get() && vector2.size() == 1)) {
            Message receive2 = createConsumer2.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
            LOG.info("post: from consumer2 received: " + receive2);
            assertNotNull("got second message on consumer2", receive2);
            createSession3.commit();
        }
        Iterator it = vector.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection4 = activeMQConnectionFactory2.createConnection();
        createConnection4.start();
        MessageConsumer createConsumer3 = createConnection4.createSession(false, 1).createConsumer(createQueue);
        Message receive3 = createConsumer3.receive(1000L);
        if (receive3 == null) {
            receive3 = createConsumer3.receive(5000L);
        }
        LOG.info("Sweep received: " + receive3);
        assertNull("no messges left dangling but got: " + receive3, receive3);
        createConnection4.close();
    }

    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        assertNotNull(createConsumer.receive(20000L));
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        try {
            createSession2.commit();
            fail("expected transaciton rolledback ex");
        } catch (TransactionRolledBackException e) {
        }
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        assertNotNull("should get rolledback message from original restarted broker", createConsumer.receive(20000L));
        createConnection.close();
    }

    public void testWaitForMissingRedeliveries() throws Exception {
        LOG.info("testWaitForMissingRedeliveries()");
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        final Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        assertNotNull("got message just produced", receive);
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.12
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession2.commit();
                    countDownLatch.countDown();
                } catch (JMSException e) {
                }
            }
        });
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        assertTrue("commit was successful", countDownLatch.await(30L, TimeUnit.SECONDS));
        assertNull("should not get committed message", createConsumer.receive(5000L));
        createConnection.close();
    }

    public void testPoisonOnDeliveryWhilePending() throws Exception {
        LOG.info("testPoisonOnDeliveryWhilePending()");
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=0");
        final Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        assertNotNull("got message just produced", receive);
        MessageConsumer createConsumer2 = createSession2.createConsumer(createSession2.createQueue("Failover.WithTx?consumer.prefetchSize=1"));
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final Vector vector = new Vector();
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.13
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    try {
                        createSession2.commit();
                        countDownLatch.countDown();
                    } catch (JMSException e) {
                        vector.add(e);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        assertNull("consumer2 not get a message while pending to 1 or consumed by 1", createConsumer2.receive(2000L));
        assertTrue("commit completed ", countDownLatch.await(15L, TimeUnit.SECONDS));
        assertNull("consumer should not get rolledback on non redelivered message or duplicate", createConsumer.receive(5000L));
        if (!vector.isEmpty()) {
            TextMessage receive2 = createSession2.createConsumer(createSession2.createQueue("ActiveMQ.DLQ")).receive(5000L);
            assertNotNull("found message in dlq", receive2);
            assertEquals("text matches", "Test message", receive2.getText());
            createSession2.commit();
        }
        createConnection.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void produceMessage(Session session, Queue queue) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        createProducer.send(session.createTextMessage("Test message"));
        createProducer.close();
    }
}
