package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TransactionRolledBackException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/AMQ1925Test.class */
public class AMQ1925Test extends TestCase implements ExceptionListener {
    private static final Logger log = LoggerFactory.getLogger(AMQ1925Test.class);
    private static final String QUEUE_NAME = "test.amq1925";
    private static final String PROPERTY_MSG_NUMBER = "NUMBER";
    private static final int MESSAGE_COUNT = 10000;
    private BrokerService bs;
    private URI tcpUri;
    private ActiveMQConnectionFactory cf;
    private JMSException exception;

    public void XtestAMQ1925_TXInProgress() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        final Object obj = new Object();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.failover.AMQ1925Test.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    synchronized (obj) {
                        obj.wait();
                    }
                    AMQ1925Test.this.bs.stop();
                    AMQ1925Test.this.bs = new BrokerService();
                    AMQ1925Test.this.bs.setPersistent(true);
                    AMQ1925Test.this.bs.setUseJmx(true);
                    AMQ1925Test.this.bs.addConnector(AMQ1925Test.this.tcpUri);
                    AMQ1925Test.this.bs.start();
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        synchronized (obj) {
            obj.notifyAll();
        }
        for (int i = 0; i < 10000; i++) {
            Message receive = createConsumer.receive(500L);
            assertNotNull("No Message " + i + " found", receive);
            if (i < 10) {
                assertFalse("Timing problem, restarted too soon", atomicBoolean.get());
            }
            if (i == 10) {
                synchronized (obj) {
                    obj.notifyAll();
                }
            }
            if (i > 9900) {
                assertTrue("Timing problem, restarted too late", atomicBoolean.get());
            }
            assertEquals(i, receive.getIntProperty(PROPERTY_MSG_NUMBER));
            createSession.commit();
        }
        assertNull(createConsumer.receive(500L));
        createConsumer.close();
        createSession.close();
        createConnection.close();
        assertQueueEmpty();
    }

    public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(createSession2.createQueue(QUEUE_NAME));
        final Object obj = new Object();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.failover.AMQ1925Test.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    synchronized (obj) {
                        obj.wait();
                    }
                    AMQ1925Test.this.bs.stop();
                    AMQ1925Test.this.bs = new BrokerService();
                    AMQ1925Test.this.bs.setPersistent(true);
                    AMQ1925Test.this.bs.setUseJmx(true);
                    AMQ1925Test.this.bs.addConnector(AMQ1925Test.this.tcpUri);
                    AMQ1925Test.this.bs.start();
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        synchronized (obj) {
            obj.notifyAll();
        }
        ArrayList arrayList = new ArrayList(10000);
        int i = 0;
        while (true) {
            if (i >= 10000) {
                break;
            }
            Message receive = createConsumer.receive(20L);
            Message receive2 = createConsumer2.receive(20L);
            if (receive != null || receive2 != null) {
                if (i < 10) {
                    assertFalse("Timing problem, restarted too soon", atomicBoolean.get());
                }
                if (i == 10) {
                    synchronized (obj) {
                        obj.notifyAll();
                    }
                }
                if (i > 9950) {
                    assertTrue("Timing problem, restarted too late", atomicBoolean.get());
                }
                if (receive != null) {
                    arrayList.add(Integer.valueOf(receive.getIntProperty(PROPERTY_MSG_NUMBER)));
                    createSession.commit();
                }
                if (receive2 != null) {
                    arrayList.add(Integer.valueOf(receive2.getIntProperty(PROPERTY_MSG_NUMBER)));
                    createSession2.commit();
                }
                i++;
            } else if (arrayList.size() < 10000) {
                Message receive3 = createConsumer.receive(500L);
                Message receive4 = createConsumer2.receive(500L);
                if (receive3 != null || receive4 == null) {
                }
            }
        }
        assertNull(createConsumer.receive(500L));
        assertNull(createConsumer2.receive(500L));
        createConsumer.close();
        createSession.close();
        createConsumer2.close();
        createSession2.close();
        createConnection.close();
        int tryToFetchMissingMessages = arrayList.size() < 10000 ? tryToFetchMissingMessages() : 0;
        for (int i2 = 0; i2 < 10000; i2++) {
            assertTrue("Message-Nr " + i2 + " not found (" + arrayList.size() + " total, " + tryToFetchMissingMessages + " have been found 'lingering' in the queue)", arrayList.contains(Integer.valueOf(i2)));
        }
        assertQueueEmpty();
    }

    private int tryToFetchMissingMessages() throws JMSException {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        int i = 0;
        while (true) {
            Message receive = createConsumer.receive(500L);
            if (receive == null) {
                createConsumer.close();
                createSession.close();
                createConnection.close();
                return i;
            }
            log.info("Found \"missing\" message: " + receive);
            i++;
        }
    }

    public void testAMQ1925_TXBegin() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        createConnection.setExceptionListener(this);
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        boolean z = false;
        int i = 0;
        while (i < 10000) {
            Message receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            if (i == 222 && !z) {
                this.bs.stop();
                this.bs = new BrokerService();
                this.bs.setPersistent(true);
                this.bs.setUseJmx(true);
                this.bs.addConnector(this.tcpUri);
                this.bs.start();
                z = true;
            }
            assertEquals(i, receive.getIntProperty(PROPERTY_MSG_NUMBER));
            try {
                createSession.commit();
            } catch (TransactionRolledBackException e) {
                log.info("got rollback: " + e);
                i--;
            }
            i++;
        }
        assertNull(createConsumer.receive(500L));
        createConsumer.close();
        createSession.close();
        createConnection.close();
        assertQueueEmpty();
        assertNull("no exception on connection listener: " + this.exception, this.exception);
    }

    public void testAMQ1925_TXCommited() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        for (int i = 0; i < 10000; i++) {
            Message receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            assertEquals(i, receive.getIntProperty(PROPERTY_MSG_NUMBER));
            createSession.commit();
            if (i == 222) {
                this.bs.stop();
                this.bs = new BrokerService();
                this.bs.setPersistent(true);
                this.bs.setUseJmx(true);
                this.bs.addConnector(this.tcpUri);
                this.bs.start();
            }
        }
        assertNull(createConsumer.receive(500L));
        createConsumer.close();
        createSession.close();
        createConnection.close();
        assertQueueEmpty();
    }

    private void assertQueueEmpty() throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(QUEUE_NAME));
        Message receive = createConsumer.receive(500L);
        if (receive != null) {
            fail(receive.toString());
        }
        createConsumer.close();
        createSession.close();
        createConnection.close();
        assertQueueLength(0);
    }

    private void assertQueueLength(int i) throws Exception, IOException {
        assertEquals(i, ((Queue) this.bs.getBroker().getDestinations(new ActiveMQQueue(QUEUE_NAME)).iterator().next()).getMessageStore().getMessageCount());
    }

    private void sendMessagesToQueue() throws Exception {
        Connection createConnection = this.cf.createConnection();
        Session createSession = createConnection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE_NAME));
        createProducer.setDeliveryMode(2);
        for (int i = 0; i < 10000; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("Test message " + i);
            createTextMessage.setIntProperty(PROPERTY_MSG_NUMBER, i);
            createProducer.send(createTextMessage);
        }
        createSession.commit();
        createProducer.close();
        createSession.close();
        createConnection.close();
        assertQueueLength(10000);
    }

    protected void setUp() throws Exception {
        this.exception = null;
        this.bs = new BrokerService();
        this.bs.setDeleteAllMessagesOnStartup(true);
        this.bs.setPersistent(true);
        this.bs.setUseJmx(true);
        TransportConnector addConnector = this.bs.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.bs.start();
        this.tcpUri = addConnector.getConnectUri();
        this.cf = new ActiveMQConnectionFactory("failover://(" + this.tcpUri + ")");
        sendMessagesToQueue();
    }

    protected void tearDown() throws Exception {
        new ServiceStopper().stop(this.bs);
    }

    public void onException(JMSException jMSException) {
        this.exception = jMSException;
    }
}
