package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.class */
public class DurableSubscriptionUnsubscribeTest extends org.apache.activemq.TestSupport {
    ActiveMQTopic topic;
    BrokerService broker = null;
    Connection connection = null;
    private final AtomicInteger advisories = new AtomicInteger(0);

    public void testJMXSubscriptionUnsubscribe() throws Exception {
        doJMXUnsubscribe(false);
    }

    public void testJMXSubscriptionUnsubscribeWithRestart() throws Exception {
        doJMXUnsubscribe(true);
    }

    public void testConnectionSubscriptionUnsubscribe() throws Exception {
        doConnectionUnsubscribe(false);
    }

    public void testConnectionSubscriptionUnsubscribeWithRestart() throws Exception {
        doConnectionUnsubscribe(true);
    }

    public void testDirectSubscriptionUnsubscribe() throws Exception {
        doDirectUnsubscribe(false);
    }

    public void testDirectubscriptionUnsubscribeWithRestart() throws Exception {
        doDirectUnsubscribe(true);
    }

    public void doJMXUnsubscribe(boolean z) throws Exception {
        createSubscriptions();
        createAdvisorySubscription();
        Thread.sleep(1000L);
        assertCount(100, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(100, 0);
        }
        ObjectName[] inactiveDurableTopicSubscribers = this.broker.getAdminView().getInactiveDurableTopicSubscribers();
        for (int i = 0; i < inactiveDurableTopicSubscribers.length; i++) {
            ((DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(inactiveDurableTopicSubscribers[i], DurableSubscriptionViewMBean.class, true)).destroy();
            if (i % 20 == 0) {
                Thread.sleep(1000L);
                assertCount((100 - i) - 1, 0);
            }
        }
        Thread.sleep(1000L);
        assertCount(0, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(0, 0);
        }
    }

    public void doConnectionUnsubscribe(boolean z) throws Exception {
        createSubscriptions();
        createAdvisorySubscription();
        Thread.sleep(1000L);
        assertCount(100, 0);
        Session createSession = this.connection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId1");
        Thread.sleep(1000L);
        assertCount(100, 1);
        Session createSession2 = this.connection.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId2");
        Thread.sleep(1000L);
        assertCount(100, 2);
        createSession.close();
        Thread.sleep(1000L);
        assertCount(100, 1);
        createSession2.close();
        Thread.sleep(1000L);
        assertCount(100, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(100, 0);
        }
        for (int i = 0; i < 100; i++) {
            Session createSession3 = this.connection.createSession(false, 1);
            createSession3.unsubscribe("SubsId" + i);
            createSession3.close();
            if (i % 20 == 0) {
                Thread.sleep(1000L);
                assertCount((100 - i) - 1, 0);
            }
        }
        Thread.sleep(1000L);
        assertCount(0, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(0, 0);
        }
    }

    public void doDirectUnsubscribe(boolean z) throws Exception {
        createSubscriptions();
        createAdvisorySubscription();
        Thread.sleep(1000L);
        assertCount(100, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(100, 0);
        }
        for (int i = 0; i < 100; i++) {
            RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
            removeSubscriptionInfo.setClientId(getName());
            removeSubscriptionInfo.setSubscriptionName("SubsId" + i);
            ConnectionContext connectionContext = new ConnectionContext();
            connectionContext.setBroker(this.broker.getRegionBroker());
            connectionContext.setClientId(getName());
            this.broker.getBroker().removeSubscription(connectionContext, removeSubscriptionInfo);
            if (i % 20 == 0) {
                Thread.sleep(1000L);
                assertCount((100 - i) - 1, 0);
            }
        }
        assertCount(0, 0);
        if (z) {
            restartBroker();
            createAdvisorySubscription();
            assertCount(0, 0);
        }
    }

    private void createSubscriptions() throws Exception {
        for (int i = 0; i < 100; i++) {
            Session createSession = this.connection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId" + i);
            createSession.close();
        }
    }

    private void createAdvisorySubscription() throws Exception {
        this.connection.createSession(false, 1).createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(this.topic)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.DurableSubscriptionUnsubscribeTest.1
            public void onMessage(Message message) {
                if (((ActiveMQMessage) message).getDataStructure() instanceof RemoveSubscriptionInfo) {
                    DurableSubscriptionUnsubscribeTest.this.advisories.incrementAndGet();
                }
            }
        });
    }

    private void assertCount(int i, int i2) throws Exception {
        int i3 = i - i2;
        int i4 = 0;
        int i5 = 0;
        for (DurableTopicSubscription durableTopicSubscription : this.broker.getDestination(this.topic).getConsumers()) {
            if (durableTopicSubscription instanceof DurableTopicSubscription) {
                if (durableTopicSubscription.isActive()) {
                    i4++;
                } else {
                    i5++;
                }
            }
        }
        assertEquals(i2, i4);
        assertEquals(i3, i5);
        assertEquals(i2, this.broker.getAdminView().getDurableTopicSubscribers().length);
        assertEquals(i3, this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
        if (i == 0) {
            assertEquals(0, countMBean());
        }
        assertEquals(100, i + this.advisories.get());
    }

    private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {
        int i = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            try {
                this.broker.getManagementContext().getObjectInstance(new ObjectName("org.apache.activemq:BrokerName=" + getName() + ",Type=Subscription,active=false,name=" + getName() + "_SubsId" + i2));
                i++;
            } catch (InstanceNotFoundException e) {
            }
        }
        return i;
    }

    private void startBroker(boolean z) throws Exception {
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
        this.broker.setUseJmx(true);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setBrokerName(getName());
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        if (z) {
            this.broker.setDeleteAllMessagesOnStartup(true);
        }
        this.broker.setKeepDurableSubsActive(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connection = createConnection();
    }

    private void stopBroker() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.connection = null;
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        this.broker = null;
    }

    private void restartBroker() throws Exception {
        stopBroker();
        startBroker(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.topic = createDestination();
        startBroker(true);
    }

    protected void tearDown() throws Exception {
        stopBroker();
        super.tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public Connection createConnection() throws Exception {
        Connection createConnection = super.createConnection();
        createConnection.setClientID(getName());
        createConnection.start();
        return createConnection;
    }
}
