package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TransactionRolledBackException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ3166Test.class */
public class AMQ3166Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ3166Test.class);
    private BrokerService brokerService;
    private AtomicInteger sendAttempts = new AtomicInteger(0);

    @Test
    public void testCommitThroughAsyncErrorNoForceRollback() throws Exception {
        startBroker(false);
        Connection createConnection = createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("QAT"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("Hello A"));
        }
        createSession.commit();
        Assert.assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3166Test.1
            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 1;
            }
        }));
        createConnection.close();
    }

    @Test
    public void testCommitThroughAsyncErrorForceRollback() throws Exception {
        startBroker(true);
        Connection createConnection = createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("QAT"));
        for (int i = 0; i < 10; i++) {
            try {
                createProducer.send(createSession.createTextMessage("Hello A"));
            } catch (JMSException e) {
                Assert.assertTrue(e.getCause() instanceof XAException);
            }
        }
        createSession.commit();
        Assert.fail("Expect TransactionRolledBackException");
        Assert.assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3166Test.2
            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0;
            }
        }));
        createConnection.close();
    }

    @Test
    public void testAckCommitThroughAsyncErrorForceRollback() throws Exception {
        startBroker(true);
        Connection createConnection = createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("QAT");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.send(createSession.createTextMessage("Hello A"));
        createProducer.close();
        createSession.commit();
        Assert.assertNotNull("got message", createSession.createConsumer(createQueue).receive(4000L));
        try {
            createSession.commit();
            Assert.fail("Expect TransactionRolledBackException");
        } catch (JMSException e) {
            Assert.assertTrue(e.getCause() instanceof XAException);
            Assert.assertTrue(e.getCause().getCause() instanceof TransactionRolledBackException);
            Assert.assertTrue(e.getCause().getCause().getCause() instanceof RuntimeException);
        }
        Assert.assertTrue("one message still there!", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3166Test.3
            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalMessageCount() == 1;
            }
        }));
        createConnection.close();
    }

    @Test
    public void testErrorOnSyncSend() throws Exception {
        startBroker(false);
        ActiveMQConnection createConnection = createConnection();
        createConnection.setAlwaysSyncSend(true);
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("QAT"));
        for (int i = 0; i < 10; i++) {
            try {
                createProducer.send(createSession.createTextMessage("Hello A"));
            } catch (JMSException e) {
                LOG.info("Got expected: " + e);
                createSession.rollback();
            }
        }
        createSession.commit();
        Assert.assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3166Test.4
            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0;
            }
        }));
        createConnection.close();
    }

    @Test
    public void testRollbackOnAsyncErrorAmqApi() throws Exception {
        startBroker(false);
        ActiveMQConnection createConnection = createConnection();
        createConnection.start();
        final ActiveMQSession createSession = createConnection.createSession(true, 0);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        ActiveMQMessageProducer createProducer = createSession.createProducer(createSession.createQueue("QAT"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("Hello A"), new AsyncCallback() { // from class: org.apache.activemq.bugs.AMQ3166Test.5
                public void onSuccess() {
                    countDownLatch.countDown();
                }

                public void onException(JMSException jMSException) {
                    createSession.getTransactionContext().setRollbackOnly(true);
                    countDownLatch.countDown();
                }
            });
            if (i == 0) {
                createSession.getTransactionContext().addSynchronization(new Synchronization() { // from class: org.apache.activemq.bugs.AMQ3166Test.6
                    public void beforeEnd() throws Exception {
                        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                            AMQ3166Test.LOG.error("TimedOut waiting for aync send requests!");
                            createSession.getTransactionContext().setRollbackOnly(true);
                        }
                        super.beforeEnd();
                    }
                });
            }
        }
        try {
            createSession.commit();
            Assert.fail("expect rollback on async error");
        } catch (TransactionRolledBackException e) {
        }
        Assert.assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ3166Test.7
            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0;
            }
        }));
        createConnection.close();
    }

    private Connection createConnection() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory.createConnection();
    }

    public void startBroker(boolean z) throws Exception {
        this.brokerService = createBroker(z);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setAdvisorySupport(false);
        brokerService.setRollbackOnlyOnAsyncException(z);
        brokerService.addConnector("tcp://0.0.0.0:0");
        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.bugs.AMQ3166Test.8
            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
                if (messageAck.isStandardAck()) {
                    throw new RuntimeException("no way, won't allow any standard ack");
                }
                super.acknowledge(consumerBrokerExchange, messageAck);
            }

            public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
                if (AMQ3166Test.this.sendAttempts.incrementAndGet() > 1) {
                    throw new RuntimeException("no way, won't accept any messages");
                }
                super.send(producerBrokerExchange, message);
            }
        }});
        return brokerService;
    }
}
