package org.apache.activemq.store.kahadb;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/SubscriptionRecoveryTest.class */
public class SubscriptionRecoveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionRecoveryTest.class);
    private BrokerService service;
    private String connectionUri;
    private ActiveMQConnectionFactory cf;
    private final int MSG_COUNT = 256;

    @Before
    public void setUp() throws IOException, Exception {
        createBroker(true, false);
    }

    public void createBroker(boolean z, boolean z2) throws Exception {
        this.service = new BrokerService();
        this.service.setBrokerName("InactiveSubTest");
        this.service.setDeleteAllMessagesOnStartup(z);
        this.service.setPersistent(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        File file = new File("KahaDB");
        kahaDBPersistenceAdapter.setDirectory(file);
        kahaDBPersistenceAdapter.setJournalMaxFileLength(10240);
        kahaDBPersistenceAdapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
        kahaDBPersistenceAdapter.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5L));
        kahaDBPersistenceAdapter.setCleanupInterval(TimeUnit.SECONDS.toMillis(5L));
        if (z2) {
            for (File file2 : FileUtils.listFiles(file, new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
                LOG.info("deleting: " + file2);
                FileUtils.deleteQuietly(file2);
            }
        }
        this.service.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = "vm://InactiveSubTest?create=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionUri);
    }

    private void restartBroker() throws Exception {
        stopBroker();
        createBroker(false, false);
    }

    private void recoverBroker() throws Exception {
        stopBroker();
        createBroker(false, true);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.service != null) {
            this.service.stop();
            this.service.waitUntilStopped();
            this.service = null;
        }
    }

    @Test
    public void testDurableSubPrefetchRecovered() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("MyQueue");
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("MyDurableTopic");
        sendMessages(activeMQQueue);
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        createInactiveDurableSub(activeMQTopic);
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.1
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
        sendMessages(activeMQQueue);
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        Assert.assertTrue(getNumberOfJournalFiles() > 1);
        LOG.info("Restarting the broker.");
        restartBroker();
        LOG.info("Restarted the broker.");
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        Assert.assertTrue(getNumberOfJournalFiles() > 1);
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.2
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
        this.service.getAdminView().removeQueue(activeMQQueue.getQueueName());
        Assert.assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.3
            public boolean isSatisified() throws Exception {
                return SubscriptionRecoveryTest.this.getNumberOfJournalFiles() <= 2;
            }
        }, TimeUnit.MINUTES.toMillis(2L)));
        LOG.info("Sending {} Messages to the Topic.", 256);
        sendMessages(activeMQTopic);
        LOG.info("Attempt to consume {} messages from the Topic.", 256);
        Assert.assertEquals(256L, consumeFromInactiveDurableSub(activeMQTopic));
        LOG.info("Recovering the broker.");
        recoverBroker();
        LOG.info("Recovering the broker.");
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.4
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
    }

    @Test
    public void testDurableAcksNotDropped() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("MyQueue");
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("MyDurableTopic");
        createInactiveDurableSub(activeMQTopic);
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.5
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
        sendMessages(activeMQTopic, 1);
        sendMessages(activeMQQueue);
        LOG.info("Before consume there are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        consumeDurableMessages(activeMQTopic, 1);
        LOG.info("After consume there are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        sendMessages(activeMQQueue);
        LOG.info("More Queued. There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        Assert.assertTrue(getNumberOfJournalFiles() > 1);
        LOG.info("Restarting the broker.");
        restartBroker();
        LOG.info("Restarted the broker.");
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        Assert.assertTrue(getNumberOfJournalFiles() > 1);
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.6
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
        this.service.getAdminView().removeQueue(activeMQQueue.getQueueName());
        Assert.assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.7
            public boolean isSatisified() throws Exception {
                return SubscriptionRecoveryTest.this.getNumberOfJournalFiles() <= 3;
            }
        }, TimeUnit.MINUTES.toMillis(3L)));
        tryConsumeExpectNone(activeMQTopic);
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        LOG.info("Recovering the broker.");
        recoverBroker();
        LOG.info("Recovering the broker.");
        LOG.info("There are currently [{}] journal log files.", Integer.valueOf(getNumberOfJournalFiles()));
        Assert.assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.8
            public boolean isSatisified() throws Exception {
                ObjectName[] inactiveDurableTopicSubscribers = SubscriptionRecoveryTest.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return inactiveDurableTopicSubscribers != null && inactiveDurableTopicSubscribers.length == 1;
            }
        }));
        tryConsumeExpectNone(activeMQTopic);
        Assert.assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.kahadb.SubscriptionRecoveryTest.9
            public boolean isSatisified() throws Exception {
                return SubscriptionRecoveryTest.this.getNumberOfJournalFiles() == 1;
            }
        }, TimeUnit.MINUTES.toMillis(1L)));
    }

    private int getNumberOfJournalFiles() throws IOException {
        int i = 0;
        Iterator it = this.service.getPersistenceAdapter().getStore().getJournal().getFileMap().values().iterator();
        while (it.hasNext()) {
            if (((DataFile) it.next()) != null) {
                i++;
            }
        }
        return i;
    }

    private void createInactiveDurableSub(Topic topic) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        createConnection.createSession(false, 1).createDurableSubscriber(topic, "Inactive").close();
        createConnection.close();
    }

    private void consumeDurableMessages(Topic topic, int i) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        TopicSubscriber createDurableSubscriber = createConnection.createSession(false, 1).createDurableSubscriber(topic, "Inactive");
        createConnection.start();
        for (int i2 = 0; i2 < i; i2++) {
            if (createDurableSubscriber.receive(TimeUnit.SECONDS.toMillis(10L)) == null) {
                Assert.fail("should have received a message");
            }
        }
        createDurableSubscriber.close();
        createConnection.close();
    }

    private void tryConsumeExpectNone(Topic topic) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        TopicSubscriber createDurableSubscriber = createConnection.createSession(false, 1).createDurableSubscriber(topic, "Inactive");
        createConnection.start();
        if (createDurableSubscriber.receive(TimeUnit.SECONDS.toMillis(10L)) != null) {
            Assert.fail("Should be no messages for this durable.");
        }
        createDurableSubscriber.close();
        createConnection.close();
    }

    private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
        Connection createConnection = this.cf.createConnection();
        createConnection.setClientID("Inactive");
        createConnection.start();
        TopicSubscriber createDurableSubscriber = createConnection.createSession(false, 1).createDurableSubscriber(topic, "Inactive");
        int i = 0;
        while (createDurableSubscriber.receive(DurableSubProcessWithRestartTest.BROKER_RESTART) != null) {
            i++;
        }
        createDurableSubscriber.close();
        createConnection.close();
        return i;
    }

    private void sendMessages(Destination destination) throws Exception {
        sendMessages(destination, 256);
    }

    private void sendMessages(Destination destination, int i) throws Exception {
        Connection createConnection = this.cf.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(destination);
        createProducer.setDeliveryMode(2);
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createSession.createTextMessage("Message #" + i2 + " for destination: " + destination));
        }
        createConnection.close();
    }
}
