package org.apache.activemq.bugs;

import java.lang.Thread;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.omg.CORBA.IntHolder;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2102Test.class */
public class AMQ2102Test extends CombinationTestSupport implements Thread.UncaughtExceptionHandler {
    static final int MESSAGE_COUNT = 12120;
    static final int NUM_CONSUMERS = 10;
    static final int CONSUME_ALL = -1;
    private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
    private static final Map<Thread, Throwable> exceptions = new ConcurrentHashMap();
    final BrokerService master = new BrokerService();
    BrokerService slave = new BrokerService();
    String masterUrl;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2102Test$Consumer.class */
    public class Consumer implements Runnable, ExceptionListener {
        private ActiveMQConnectionFactory connectionFactory;
        private String name;
        private String queueName;
        private boolean running;
        private IntHolder startup;
        private Thread thread;
        private int numToProcessPerIteration;

        Consumer(ActiveMQConnectionFactory activeMQConnectionFactory, String str, IntHolder intHolder, int i, int i2) {
            this.connectionFactory = activeMQConnectionFactory;
            this.queueName = str;
            this.startup = intHolder;
            this.name = "Consumer-" + str + "-" + i;
            this.numToProcessPerIteration = i2;
            this.thread = new Thread(this, this.name);
        }

        private String getClientId() {
            try {
                return InetAddress.getLocalHost().getHostName() + ":" + this.name;
            } catch (UnknownHostException e) {
                return "localhost:" + this.name;
            }
        }

        synchronized boolean isRunning() {
            return this.running;
        }

        void join() {
            try {
                this.thread.join(Wait.MAX_WAIT_MILLIS);
            } catch (InterruptedException e) {
                AMQ2102Test.error("Interrupted waiting for " + this.name + " to stop", e);
            }
        }

        public void onException(JMSException jMSException) {
            AMQ2102Test.exceptions.put(Thread.currentThread(), jMSException);
            AMQ2102Test.error("JMS exception: ", jMSException);
        }

        private void processMessage(Session session, MessageProducer messageProducer, Message message) throws Exception {
            if (!(message instanceof TextMessage)) {
                AMQ2102Test.error("Consumer cannot process " + message.getClass().getSimpleName());
                return;
            }
            TextMessage textMessage = (TextMessage) message;
            Destination jMSReplyTo = textMessage.getJMSReplyTo();
            if (jMSReplyTo == null) {
                AMQ2102Test.debug("no reply to message => " + textMessage.getText());
                return;
            }
            TextMessage createTextMessage = session.createTextMessage("reply-" + textMessage.getText());
            createTextMessage.setJMSCorrelationID(textMessage.getJMSCorrelationID());
            messageProducer.send(jMSReplyTo, createTextMessage);
            AMQ2102Test.debug("replied via " + jMSReplyTo + " for message => " + textMessage.getText());
        }

        private void processMessages() throws JMSException {
            ActiveMQConnection activeMQConnection = null;
            try {
                activeMQConnection = (ActiveMQConnection) this.connectionFactory.createConnection();
                RedeliveryPolicy redeliveryPolicy = activeMQConnection.getRedeliveryPolicy();
                redeliveryPolicy.setMaximumRedeliveries(6);
                redeliveryPolicy.setInitialRedeliveryDelay(1000L);
                redeliveryPolicy.setUseCollisionAvoidance(false);
                redeliveryPolicy.setCollisionAvoidancePercent((short) 15);
                redeliveryPolicy.setUseExponentialBackOff(false);
                redeliveryPolicy.setBackOffMultiplier(5.0d);
                activeMQConnection.setClientID(getClientId());
                activeMQConnection.setExceptionListener(this);
                activeMQConnection.start();
                processMessages((Connection) activeMQConnection);
                activeMQConnection.close();
            } catch (Throwable th) {
                activeMQConnection.close();
                throw th;
            }
        }

        private void processMessages(Connection connection) throws JMSException {
            Session session = null;
            try {
                Session createSession = connection.createSession(true, 0);
                if (this.numToProcessPerIteration > 0) {
                    while (isRunning()) {
                        processMessages(createSession);
                    }
                } else {
                    processMessages(createSession);
                }
                if (createSession != null) {
                    createSession.close();
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    session.close();
                }
                throw th;
            }
        }

        private void processMessages(Session session) throws JMSException {
            MessageConsumer messageConsumer = null;
            try {
                messageConsumer = session.createConsumer(session.createQueue(this.queueName), (String) null);
                processMessages(session, messageConsumer);
                if (messageConsumer != null) {
                    messageConsumer.close();
                }
            } catch (Throwable th) {
                if (messageConsumer != null) {
                    messageConsumer.close();
                }
                throw th;
            }
        }

        private void processMessages(Session session, MessageConsumer messageConsumer) throws JMSException {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer((Destination) null);
                messageProducer.setDeliveryMode(1);
                processMessages(session, messageConsumer, messageProducer);
                if (messageProducer != null) {
                    messageProducer.close();
                }
            } catch (Throwable th) {
                if (messageProducer != null) {
                    messageProducer.close();
                }
                throw th;
            }
        }

        private void processMessages(Session session, MessageConsumer messageConsumer, MessageProducer messageProducer) throws JMSException {
            AMQ2102Test.debug("waiting for messages...");
            if (this.startup != null) {
                synchronized (this.startup) {
                    this.startup.value--;
                    this.startup.notify();
                }
                this.startup = null;
            }
            int i = this.numToProcessPerIteration;
            do {
                Message receive = messageConsumer.receive(5000L);
                if (receive != null) {
                    try {
                        processMessage(session, messageProducer, receive);
                        session.commit();
                    } catch (Throwable th) {
                        AMQ2102Test.error("message=" + receive + " failure", th);
                        session.rollback();
                    }
                }
                if (i != -1) {
                    i--;
                    if (i <= 0) {
                        return;
                    }
                }
            } while (isRunning());
        }

        @Override // java.lang.Runnable
        public void run() {
            setRunning(true);
            while (isRunning()) {
                try {
                    processMessages();
                } catch (Throwable th) {
                    AMQ2102Test.error("Unexpected consumer problem: ", th);
                }
            }
        }

        synchronized void setRunning(boolean z) {
            this.running = z;
        }

        void start() {
            this.thread.start();
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2102Test$Producer.class */
    private class Producer implements ExceptionListener {
        private ActiveMQConnectionFactory connectionFactory;
        private String queueName;

        Producer(ActiveMQConnectionFactory activeMQConnectionFactory, String str) {
            this.connectionFactory = activeMQConnectionFactory;
            this.queueName = str;
        }

        void execute(String[] strArr) {
            try {
                sendMessages();
            } catch (Exception e) {
                AMQ2102Test.error("Producer failed", e);
            }
        }

        private void sendMessages() throws JMSException {
            ActiveMQConnection activeMQConnection = null;
            try {
                activeMQConnection = (ActiveMQConnection) this.connectionFactory.createConnection();
                activeMQConnection.setExceptionListener(this);
                activeMQConnection.start();
                sendMessages(activeMQConnection);
                if (activeMQConnection != null) {
                    try {
                        activeMQConnection.close();
                    } catch (JMSException e) {
                        AMQ2102Test.error("Problem closing connection", e);
                    }
                }
            } catch (Throwable th) {
                if (activeMQConnection != null) {
                    try {
                        activeMQConnection.close();
                    } catch (JMSException e2) {
                        AMQ2102Test.error("Problem closing connection", e2);
                    }
                }
                throw th;
            }
        }

        private void sendMessages(ActiveMQConnection activeMQConnection) throws JMSException {
            Session session = null;
            try {
                try {
                    session = activeMQConnection.createSession(true, 1);
                    sendMessages(session);
                    if (session != null) {
                        session.close();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                    AMQ2102Test.exceptions.put(Thread.currentThread(), e);
                    if (session != null) {
                        session.rollback();
                    }
                    if (session != null) {
                        session.close();
                    }
                }
            } catch (Throwable th) {
                if (session != null) {
                    session.close();
                }
                throw th;
            }
        }

        private void sendMessages(Session session) throws JMSException {
            Destination destination = null;
            try {
                destination = session.createTemporaryQueue();
                sendMessages(session, destination);
                if (destination != null) {
                    destination.delete();
                }
            } catch (Throwable th) {
                if (destination != null) {
                    destination.delete();
                }
                throw th;
            }
        }

        private void sendMessages(Session session, Destination destination) throws JMSException {
            MessageConsumer messageConsumer = null;
            try {
                messageConsumer = session.createConsumer(destination);
                sendMessages(session, destination, messageConsumer);
                messageConsumer.close();
                session.commit();
            } catch (Throwable th) {
                messageConsumer.close();
                session.commit();
                throw th;
            }
        }

        private void sendMessages(Session session, Destination destination, int i) throws JMSException {
            MessageProducer messageProducer = null;
            try {
                messageProducer = session.createProducer(session.createQueue(this.queueName));
                messageProducer.setDeliveryMode(2);
                messageProducer.setTimeToLive(0L);
                messageProducer.setPriority(4);
                for (int i2 = 0; i2 < i; i2++) {
                    TextMessage createTextMessage = session.createTextMessage("message#" + i2);
                    createTextMessage.setJMSReplyTo(destination);
                    messageProducer.send(createTextMessage);
                }
                if (messageProducer != null) {
                    messageProducer.close();
                }
            } catch (Throwable th) {
                if (messageProducer != null) {
                    messageProducer.close();
                }
                throw th;
            }
        }

        private void sendMessages(final Session session, Destination destination, MessageConsumer messageConsumer) throws JMSException {
            final IntHolder intHolder = new IntHolder(AMQ2102Test.MESSAGE_COUNT);
            messageConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ2102Test.Producer.1
                public void onMessage(Message message) {
                    if (!(message instanceof TextMessage)) {
                        AMQ2102Test.error("Producer cannot process " + message.getClass().getSimpleName());
                        return;
                    }
                    TextMessage textMessage = (TextMessage) message;
                    synchronized (intHolder) {
                        try {
                            AMQ2102Test.debug("receive reply#" + intHolder.value + " " + textMessage.getText());
                        } catch (JMSException e) {
                            AMQ2102Test.error("Problem processing reply", e);
                        }
                        intHolder.value--;
                        if (intHolder.value % 200 == 0) {
                            AMQ2102Test.info("acking via session commit: messageCount=" + intHolder.value);
                            try {
                                session.commit();
                            } catch (JMSException e2) {
                                AMQ2102Test.error("Failed to commit with count: " + intHolder.value, e2);
                            }
                        }
                        intHolder.notify();
                    }
                }
            });
            sendMessages(session, destination, intHolder.value);
            session.commit();
            synchronized (intHolder) {
                while (intHolder.value > 0) {
                    try {
                        intHolder.wait();
                    } catch (InterruptedException e) {
                        AMQ2102Test.error("Interrupted waiting for replies", e);
                    }
                }
            }
            session.commit();
            AMQ2102Test.debug("All replies received...");
        }

        public void onException(JMSException jMSException) {
            AMQ2102Test.LOG.error(jMSException);
            AMQ2102Test.exceptions.put(Thread.currentThread(), jMSException);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void debug(String str) {
        LOG.debug(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void info(String str) {
        LOG.info(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void error(String str) {
        LOG.error(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void error(String str, Throwable th) {
        th.printStackTrace();
        String str2 = str + ": " + (th.getMessage() != null ? th.getMessage() : th.toString());
        LOG.error(str2, th);
        exceptions.put(Thread.currentThread(), th);
        fail(str2);
    }

    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory activeMQConnectionFactory, String str, int i, int i2) {
        ArrayList<Consumer> arrayList = new ArrayList<>(i);
        IntHolder intHolder = new IntHolder(i);
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(new Consumer(activeMQConnectionFactory, str, intHolder, i3, i2));
        }
        Iterator<Consumer> it = arrayList.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        synchronized (intHolder) {
            while (intHolder.value > 0) {
                try {
                    intHolder.wait();
                } catch (InterruptedException e) {
                    error("Interrupted waiting for consumers to start", e);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        this.master.setBrokerName("Master");
        this.master.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.master.deleteAllMessages();
        this.master.setWaitForSlave(true);
        new Thread() { // from class: org.apache.activemq.bugs.AMQ2102Test.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AMQ2102Test.this.master.start();
                } catch (Exception e) {
                    e.printStackTrace();
                    AMQ2102Test.exceptions.put(Thread.currentThread(), e);
                }
            }
        }.start();
        Thread.sleep(2000L);
        this.masterUrl = ((TransportConnector) this.master.getTransportConnectors().get(0)).getConnectUri().toString();
        debug("masterUrl: " + this.masterUrl);
        this.slave.setBrokerName("Slave");
        this.slave.deleteAllMessages();
        this.slave.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.slave.setMasterConnectorURI(this.masterUrl);
        this.slave.start();
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.master.stop();
        this.slave.stop();
        exceptions.clear();
    }

    public void testMasterSlaveBug() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.masterUrl + ")?randomize=false");
        ArrayList<Consumer> createConsumers = createConsumers(activeMQConnectionFactory, "MasterSlaveBug", 10, -1);
        new Producer(activeMQConnectionFactory, "MasterSlaveBug").execute(new String[0]);
        Iterator<Consumer> it = createConsumers.iterator();
        while (it.hasNext()) {
            it.next().setRunning(false);
        }
        Iterator<Consumer> it2 = createConsumers.iterator();
        while (it2.hasNext()) {
            it2.next().join();
        }
        assertTrue(exceptions.isEmpty());
    }

    public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.masterUrl + ")?randomize=false");
        ArrayList<Consumer> createConsumers = createConsumers(activeMQConnectionFactory, "MasterSlaveBug", 10, 10);
        new Producer(activeMQConnectionFactory, "MasterSlaveBug").execute(new String[0]);
        Iterator<Consumer> it = createConsumers.iterator();
        while (it.hasNext()) {
            it.next().setRunning(false);
        }
        Iterator<Consumer> it2 = createConsumers.iterator();
        while (it2.hasNext()) {
            it2.next().join();
        }
        assertTrue(exceptions.isEmpty());
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        error("" + thread + th);
        exceptions.put(thread, th);
    }
}
