package org.apache.activemq.store.jdbc;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/jdbc/JDBCConcurrentDLQTest.class */
public class JDBCConcurrentDLQTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCConcurrentDLQTest.class);
    BrokerService broker;
    JDBCPersistenceAdapter jdbcPersistenceAdapter;
    Appender appender = null;
    final AtomicBoolean gotError = new AtomicBoolean(false);

    @Before
    public void setUp() throws Exception {
        this.gotError.set(false);
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(JDBCPersistenceAdapter.class));
        org.apache.logging.log4j.core.Logger logger2 = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        this.appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.1
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.2
            public void append(LogEvent logEvent) {
                if (logEvent.getLevel().isMoreSpecificThan(Level.WARN)) {
                    JDBCConcurrentDLQTest.LOG.error("Got error from log:" + logEvent.getMessage().getFormattedMessage());
                    JDBCConcurrentDLQTest.this.gotError.set(true);
                }
            }
        };
        this.appender.start();
        logger.get().addAppender(this.appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.3
        });
        logger.addAppender(this.appender);
        logger2.get().addAppender(this.appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.4
        });
        logger2.addAppender(this.appender);
    }

    @After
    public void tearDown() throws Exception {
        LogManager.getLogger(RegionBroker.class).removeAppender(this.appender);
        LogManager.getLogger(JDBCPersistenceAdapter.class).removeAppender(this.appender);
        this.broker.stop();
    }

    protected BrokerService createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(true);
        this.broker.setAdvisorySupport(false);
        this.jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
        this.jdbcPersistenceAdapter.setUseLock(false);
        this.broker.setPersistenceAdapter(this.jdbcPersistenceAdapter);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://0.0.0.0:0");
        return this.broker;
    }

    @Test
    public void testConcurrentDlqOk() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("DD");
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        this.broker.setIoExceptionHandler(new DefaultIOExceptionHandler() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.5
            public void handle(IOException iOException) {
                JDBCConcurrentDLQTest.LOG.error("handle IOException from store", iOException);
                JDBCConcurrentDLQTest.this.gotError.set(true);
            }
        });
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        org.apache.logging.log4j.core.Logger logger2 = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(JDBCPersistenceAdapter.class));
        logger.get().addAppender(this.appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.6
        });
        logger.addAppender(this.appender);
        logger2.get().addAppender(this.appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.7
        });
        logger2.addAppender(this.appender);
        final AtomicInteger atomicInteger = new AtomicInteger(100);
        produceMessages(activeMQConnectionFactory, activeMQQueue, 100);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 50; i++) {
            newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.8
                @Override // java.lang.Runnable
                public void run() {
                    ActiveMQConnection activeMQConnection = null;
                    try {
                        try {
                            activeMQConnection = activeMQConnectionFactory.createConnection();
                            activeMQConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.8.1
                                public void onException(JMSException jMSException) {
                                    jMSException.printStackTrace();
                                }
                            });
                            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
                            redeliveryPolicy.setMaximumRedeliveries(0);
                            activeMQConnection.setRedeliveryPolicy(redeliveryPolicy);
                            activeMQConnection.start();
                            Session createSession = activeMQConnection.createSession(true, 0);
                            MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
                            while (atomicInteger.get() > 0 && !JDBCConcurrentDLQTest.this.gotError.get()) {
                                if (createConsumer.receive(4000L) != null) {
                                    atomicInteger.decrementAndGet();
                                    createSession.rollback();
                                }
                            }
                            if (activeMQConnection != null) {
                                try {
                                    activeMQConnection.close();
                                } catch (Exception e) {
                                }
                            }
                        } catch (Exception e2) {
                            JDBCConcurrentDLQTest.LOG.error("Error on consumption", e2);
                            JDBCConcurrentDLQTest.this.gotError.set(true);
                            if (activeMQConnection != null) {
                                try {
                                    activeMQConnection.close();
                                } catch (Exception e3) {
                                }
                            }
                        }
                    } catch (Throwable th) {
                        if (activeMQConnection != null) {
                            try {
                                activeMQConnection.close();
                            } catch (Exception e4) {
                                throw th;
                            }
                        }
                        throw th;
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        boolean awaitTermination = newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
        newCachedThreadPool.shutdownNow();
        LOG.info("Total messages: " + this.broker.getAdminView().getTotalMessageCount());
        LOG.info("Total enqueues: " + this.broker.getAdminView().getTotalEnqueueCount());
        LOG.info("Total deueues: " + this.broker.getAdminView().getTotalDequeueCount());
        Assert.assertTrue(awaitTermination);
        Assert.assertEquals("all consumed", 0L, atomicInteger.get());
        Assert.assertEquals("all messages get to the dlq", 200L, this.broker.getAdminView().getTotalEnqueueCount());
        Assert.assertEquals("all messages acked", 100L, this.broker.getAdminView().getTotalDequeueCount());
        Assert.assertFalse("no error", this.gotError.get());
    }

    private void produceMessages(ActiveMQConnectionFactory activeMQConnectionFactory, Destination destination, int i) throws JMSException {
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.store.jdbc.JDBCConcurrentDLQTest.9
            public void onException(JMSException jMSException) {
                jMSException.printStackTrace();
            }
        });
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(destination);
        long j = 0;
        TextMessage createTextMessage = createSession.createTextMessage();
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createTextMessage);
            j++;
            if (j % 50 == 0) {
                LOG.info("sent " + j + " messages");
            }
        }
        if (createConnection != null) {
            createConnection.close();
        }
    }
}
