package org.ikasan.testharness.flow.jms;

import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;

/* loaded from: input_file:org/ikasan/testharness/flow/jms/ActiveMqHelper.class */
public class ActiveMqHelper {

    /* loaded from: input_file:org/ikasan/testharness/flow/jms/ActiveMqHelper$ActiveMQBrokerExtension.class */
    private class ActiveMQBrokerExtension {
        private final Broker broker;

        public ActiveMQBrokerExtension(Broker broker) {
            this.broker = broker;
        }

        public void clearAllMessages() throws Exception {
            for (Destination destination : this.broker.getDestinationMap().values()) {
                ActiveMQDestination activeMQDestination = destination.getActiveMQDestination();
                if (activeMQDestination.isTopic()) {
                    clearAllMessages((Topic) destination);
                } else if (activeMQDestination.isQueue()) {
                    clearAllMessages((Queue) destination);
                }
            }
        }

        private void clearAllMessages(Topic topic) throws IOException {
            Iterator it = topic.getConsumers().iterator();
            while (it.hasNext()) {
                topic.getMessageStore().removeAllMessages(((Subscription) it.next()).getContext());
            }
        }

        private void clearAllMessages(Queue queue) throws Exception {
            queue.purge();
        }
    }

    public void shutdownBroker() {
        System.out.println("Shutdown Broker called -  will shutdown any brokers still running");
        try {
            for (BrokerService brokerService : BrokerRegistry.getInstance().getBrokers().values()) {
                System.out.println("Waiting for broker " + brokerService.getBrokerName() + " to be stopped");
                brokerService.stop();
                brokerService.waitUntilStopped();
                System.out.println("Broker " + brokerService.getBrokerName() + " is stopped, check = " + brokerService.isStopped());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void removeAllMessages() {
        try {
            Iterator it = BrokerRegistry.getInstance().getBrokers().values().iterator();
            while (it.hasNext()) {
                new ActiveMQBrokerExtension(((BrokerService) it.next()).getBroker()).clearAllMessages();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
