package org.apache.activemq.broker.ft;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.class */
public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnectionsTest implements ExceptionListener {
    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
    EmbeddedDataSource sharedDs;
    BrokerService broker;
    public boolean transactedSends = false;
    public int failureCount = 25;
    int inflightMessageCount = 0;
    final CountDownLatch restartDBLatch = new CountDownLatch(1);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest, org.apache.activemq.JmsSendReceiveTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        this.topic = false;
        this.verbose = true;
        this.sharedDs = DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
        this.broker = new BrokerService();
        DefaultIOExceptionHandler defaultIOExceptionHandler = new DefaultIOExceptionHandler();
        defaultIOExceptionHandler.setIgnoreSQLExceptions(false);
        defaultIOExceptionHandler.setStopStartConnectors(true);
        this.broker.setIoExceptionHandler(defaultIOExceptionHandler);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.setUseJmx(false);
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        JDBCPersistenceAdapter jDBCPersistenceAdapter = new JDBCPersistenceAdapter();
        jDBCPersistenceAdapter.setDataSource(this.sharedDs);
        jDBCPersistenceAdapter.setUseLock(false);
        jDBCPersistenceAdapter.setLockKeepAlivePeriod(500L);
        jDBCPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500L);
        this.broker.setPersistenceAdapter(jDBCPersistenceAdapter);
        this.broker.start();
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest
    public void tearDown() throws Exception {
        super.tearDown();
        this.broker.stop();
    }

    @After
    public void shutDownDerby() {
        DataSourceServiceSupport.shutdownDefaultDataSource(this.sharedDs);
    }

    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest
    protected Session createSendSession(Connection connection) throws Exception {
        return this.transactedSends ? connection.createSession(true, 0) : connection.createSession(false, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest, org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover://" + ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setExceptionListener(this);
        return activeMQConnectionFactory;
    }

    @Override // org.apache.activemq.JmsSendReceiveTestSupport
    protected void messageSent() throws Exception {
        int i = this.inflightMessageCount + 1;
        this.inflightMessageCount = i;
        if (i == this.failureCount) {
            LOG.info("STOPPING DB!@!!!!");
            final EmbeddedDataSource embeddedDataSource = this.sharedDs;
            embeddedDataSource.setShutdownDatabase("shutdown");
            embeddedDataSource.setCreateDatabase("not_any_more");
            try {
                embeddedDataSource.getConnection();
            } catch (Exception e) {
            }
            LOG.info("DB STOPPED!@!!!!");
            new Thread("db-re-start-thread") { // from class: org.apache.activemq.broker.ft.DbRestartJDBCQueueTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    DbRestartJDBCQueueTest.LOG.info("Sleeping for 10 seconds before allowing db restart");
                    try {
                        DbRestartJDBCQueueTest.this.restartDBLatch.await(10L, TimeUnit.SECONDS);
                    } catch (InterruptedException e2) {
                        e2.printStackTrace();
                    }
                    embeddedDataSource.setShutdownDatabase("false");
                    DbRestartJDBCQueueTest.LOG.info("DB RESTARTED!@!!!!");
                }
            }.start();
        }
    }

    @Override // org.apache.activemq.JmsSendReceiveTestSupport
    protected void sendToProducer(MessageProducer messageProducer, Destination destination, Message message) throws JMSException {
        boolean z = false;
        do {
            try {
                messageProducer.send(destination, message);
                if (this.transactedSends && ((this.inflightMessageCount + 1) % 10 == 0 || this.inflightMessageCount + 1 >= this.messageCount)) {
                    LOG.info("committing on send: " + this.inflightMessageCount + " message: " + message);
                    this.session.commit();
                }
                z = true;
            } catch (JMSException e) {
                LOG.info("Exception on producer send:", e);
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                }
            }
        } while (!z);
    }

    public void onException(JMSException jMSException) {
        LOG.error("exception on connection: ", jMSException);
    }
}
