package org.apache.activemq.bugs;

import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TransactionRolledBackException;
import java.io.IOException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ5844Test.class */
public class AMQ5844Test {
    protected BrokerService broker;
    protected long checkPeriod = 2000;
    protected long maxSlowDuration = 4000;
    private String uri;
    private static final String QUEUE_NAME = "TEST.QUEUE";
    static final Logger LOG = LoggerFactory.getLogger(AMQ5844Test.class);
    static boolean abortingSlowConsumer = false;
    static boolean successfullyReconnected = false;
    static final AbstractAppender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.bugs.AMQ5844Test.1
    }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.bugs.AMQ5844Test.2
        public void append(LogEvent logEvent) {
            if (logEvent.getMessage().getFormattedMessage().contains("aborting slow consumer")) {
                AMQ5844Test.abortingSlowConsumer = true;
            }
            if (logEvent.getMessage().getFormattedMessage().contains("Successfully reconnected to")) {
                AMQ5844Test.successfullyReconnected = true;
            }
        }
    };

    @BeforeClass
    public static void setUp() throws Exception {
        org.apache.logging.log4j.core.Logger rootLogger = LogManager.getRootLogger();
        rootLogger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.bugs.AMQ5844Test.3
        });
        rootLogger.addAppender(appender);
    }

    @Before
    public void createMaster() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        TransportConnector addConnector = this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.setIoExceptionHandler(new DefaultIOExceptionHandler());
        this.broker.setBrokerName("Main");
        PolicyEntry policyEntry = new PolicyEntry();
        AbortSlowAckConsumerStrategy abortSlowAckConsumerStrategy = new AbortSlowAckConsumerStrategy();
        abortSlowAckConsumerStrategy.setAbortConnection(false);
        abortSlowAckConsumerStrategy.setCheckPeriod(this.checkPeriod);
        abortSlowAckConsumerStrategy.setMaxSlowDuration(this.maxSlowDuration);
        abortSlowAckConsumerStrategy.setMaxTimeSinceLastAck(this.maxSlowDuration);
        policyEntry.setSlowConsumerStrategy(abortSlowAckConsumerStrategy);
        policyEntry.setQueuePrefetch(0);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.uri = addConnector.getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        LogManager.getRootLogger().removeAppender(appender);
    }

    @Test
    public void testRecreateAbortedConsumer() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.uri + ")");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage("Plain Text Message");
        createProducer.send(createTextMessage, 1, 1, 0L);
        createProducer.send(createTextMessage, 1, 1, 0L);
        createSession.commit();
        createProducer.close();
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createQueue);
        Assert.assertNotNull(createConsumer.receive());
        Assert.assertTrue("The browser aborts the slow consumer", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ5844Test.4
            public boolean isSatisified() throws Exception {
                return AMQ5844Test.abortingSlowConsumer;
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART));
        ((FailoverTransport) createConnection.getTransport().narrow(FailoverTransport.class)).handleTransportFailure(new IOException());
        Assert.assertTrue("The broker aborts the slow consumer", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ5844Test.5
            public boolean isSatisified() throws Exception {
                return AMQ5844Test.successfullyReconnected;
            }
        }, 4000L));
        try {
            Assert.assertNull(createConsumer.receive(2000L));
            createSession.commit();
            Assert.fail("Expect the commit to fail and a rollback to happen");
        } catch (TransactionRolledBackException e) {
            Assert.assertTrue(e.getMessage().contains("rolling back transaction"));
        }
        createConnection.close();
    }

    static {
        appender.start();
    }
}
