package org.apache.activemq.bugs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/DuplicateFromStoreTest.class */
public class DuplicateFromStoreTest {
    String activemqURL;
    BrokerService broker;
    protected static final String DESTNAME = "TEST";
    protected static final int NUM_PRODUCERS = 100;
    protected static final int NUM_CONSUMERS = 20;
    protected static final int NUM_MSGS = 20000;
    protected static final int CONSUMER_SLEEP = 0;
    protected static final int PRODUCER_SLEEP = 10;
    public AtomicInteger totalMessagesToSend = new AtomicInteger(20000);
    public AtomicInteger totalMessagesSent = new AtomicInteger(20000);
    public AtomicInteger totalReceived = new AtomicInteger(0);
    public int messageSize = 16000;
    static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class);
    public static CountDownLatch producersFinished = new CountDownLatch(100);
    public static CountDownLatch consumersFinished = new CountDownLatch(20);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/bugs/DuplicateFromStoreTest$Consumer.class */
    public class Consumer implements Runnable {
        protected String queueName;
        boolean isTopic;
        public Object init = new Object();
        Logger log = DuplicateFromStoreTest.LOG;

        public Consumer(String str, boolean z) {
            this.queueName = DuplicateFromStoreTest.DESTNAME;
            this.isTopic = false;
            this.isTopic = z;
            this.queueName = str;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            Connection connection = null;
            try {
                try {
                    Connection createConnection = new ActiveMQConnectionFactory(DuplicateFromStoreTest.this.activemqURL).createConnection();
                    createConnection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.bugs.DuplicateFromStoreTest.Consumer.1
                        public void onException(JMSException jMSException) {
                            jMSException.printStackTrace();
                        }
                    });
                    createConnection.start();
                    Session createSession = createConnection.createSession(false, 1);
                    MessageConsumer createConsumer = createSession.createConsumer(this.isTopic ? createSession.createTopic(this.queueName) : createSession.createQueue(this.queueName));
                    synchronized (this.init) {
                        this.init.notifyAll();
                    }
                    long j = 0;
                    while (DuplicateFromStoreTest.this.totalReceived.get() < 20000) {
                        TextMessage receive = createConsumer.receive(5000L);
                        if (!(receive instanceof TextMessage)) {
                            if (DuplicateFromStoreTest.this.totalReceived.get() >= 20000) {
                                break;
                            } else {
                                this.log.error("Received message of unsupported type. Expecting TextMessage. count: " + DuplicateFromStoreTest.this.totalReceived.get());
                            }
                        } else {
                            this.log.debug("Received: " + receive.getText().substring(0, 50));
                        }
                        if (receive != null) {
                            j++;
                            DuplicateFromStoreTest.this.totalReceived.incrementAndGet();
                            if (j % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
                                this.log.info("received " + j + " messages");
                            }
                            Thread.sleep(0L);
                        }
                    }
                    try {
                        if (createConnection != null) {
                            try {
                                createConnection.close();
                            } catch (Exception e) {
                                DuplicateFromStoreTest.consumersFinished.countDown();
                                return;
                            }
                        }
                        DuplicateFromStoreTest.consumersFinished.countDown();
                    } catch (Throwable th) {
                        DuplicateFromStoreTest.consumersFinished.countDown();
                        throw th;
                    }
                } catch (Exception e2) {
                    try {
                        this.log.error("Error in Consumer: " + e2.getMessage());
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Exception e3) {
                                DuplicateFromStoreTest.consumersFinished.countDown();
                                return;
                            }
                        }
                        DuplicateFromStoreTest.consumersFinished.countDown();
                    } catch (Throwable th2) {
                        DuplicateFromStoreTest.consumersFinished.countDown();
                        throw th2;
                    }
                }
            } catch (Throwable th3) {
                if (0 != 0) {
                    try {
                        try {
                            connection.close();
                        } catch (Exception e4) {
                            DuplicateFromStoreTest.consumersFinished.countDown();
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        DuplicateFromStoreTest.consumersFinished.countDown();
                        throw th4;
                    }
                }
                DuplicateFromStoreTest.consumersFinished.countDown();
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/bugs/DuplicateFromStoreTest$Producer.class */
    public class Producer implements Runnable {
        Logger log = DuplicateFromStoreTest.LOG;
        protected String destName;
        protected boolean isTopicDest;

        public Producer(String str, boolean z, int i) {
            this.destName = DuplicateFromStoreTest.DESTNAME;
            this.isTopicDest = false;
            this.destName = str;
            this.isTopicDest = z;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            Connection connection = null;
            try {
                try {
                    connection = new ActiveMQConnectionFactory(DuplicateFromStoreTest.this.activemqURL).createConnection();
                    connection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.bugs.DuplicateFromStoreTest.Producer.1
                        public void onException(JMSException jMSException) {
                            jMSException.printStackTrace();
                        }
                    });
                    connection.start();
                    Session createSession = connection.createSession(false, 1);
                    MessageProducer createProducer = createSession.createProducer(this.isTopicDest ? createSession.createTopic(this.destName) : createSession.createQueue(this.destName));
                    long j = 0;
                    StringBuilder sb = new StringBuilder();
                    sb.setLength(16384 + 15);
                    sb.append("Message: ");
                    sb.append(0L);
                    for (int i = 0; i < 16384 / 10; i++) {
                        sb.append("XXXXXXXXXX");
                    }
                    TextMessage createTextMessage = createSession.createTextMessage(sb.toString());
                    while (DuplicateFromStoreTest.this.totalMessagesToSend.decrementAndGet() >= 0) {
                        try {
                            createProducer.send(createTextMessage);
                            DuplicateFromStoreTest.this.totalMessagesSent.incrementAndGet();
                            this.log.debug("Sent message: " + j);
                            j++;
                            if (j % DurableSubProcessWithRestartTest.BROKER_RESTART == 0) {
                                this.log.info("sent " + j + " messages");
                            }
                            Thread.sleep(10L);
                        } catch (Throwable th) {
                            DuplicateFromStoreTest.producersFinished.countDown();
                            throw th;
                        }
                    }
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Exception e) {
                            DuplicateFromStoreTest.producersFinished.countDown();
                        }
                    }
                    DuplicateFromStoreTest.producersFinished.countDown();
                    this.log.debug("Closing producer for " + this.destName);
                } catch (Throwable th2) {
                    if (connection != null) {
                        try {
                            try {
                                connection.close();
                            } catch (Exception e2) {
                                DuplicateFromStoreTest.producersFinished.countDown();
                                throw th2;
                            }
                        } catch (Throwable th3) {
                            DuplicateFromStoreTest.producersFinished.countDown();
                            throw th3;
                        }
                    }
                    DuplicateFromStoreTest.producersFinished.countDown();
                    throw th2;
                }
            } catch (Exception e3) {
                this.log.error(e3.toString());
                if (connection != null) {
                    try {
                        try {
                            connection.close();
                        } catch (Exception e4) {
                            DuplicateFromStoreTest.producersFinished.countDown();
                            return;
                        }
                    } catch (Throwable th4) {
                        DuplicateFromStoreTest.producersFinished.countDown();
                        throw th4;
                    }
                }
                DuplicateFromStoreTest.producersFinished.countDown();
            }
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://0.0.0.0:0");
        PolicyEntry policyEntry = new PolicyEntry();
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(">");
        policyEntry.setDestination(activeMQQueue);
        policyEntry.setMemoryLimit(10485760L);
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setEnableAudit(false);
        policyEntry.setQueuePrefetch(100);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(activeMQQueue, policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setPercentOfJvmHeap(50);
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(0L);
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.setMemoryUsage(memoryUsage);
        systemUsage.setStoreUsage(storeUsage);
        this.broker.setSystemUsage(systemUsage);
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setConcurrentStoreAndDispatchQueues(true);
        this.broker.setPersistenceAdapter(kahaDBStore);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.activemqURL = this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testDuplicateMessage() throws Exception {
        LOG.info("Testing for duplicate messages.");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
        ExecutorService newFixedThreadPool2 = Executors.newFixedThreadPool(20);
        createOpenwireClients(newFixedThreadPool, newFixedThreadPool2);
        LOG.info("All producers and consumers got started. Awaiting their termination");
        producersFinished.await(100L, TimeUnit.MINUTES);
        LOG.info("All producers have terminated. remaining to send: " + this.totalMessagesToSend.get() + ", sent:" + this.totalMessagesSent.get());
        consumersFinished.await(100L, TimeUnit.MINUTES);
        LOG.info("All consumers have terminated.");
        newFixedThreadPool.shutdownNow();
        newFixedThreadPool2.shutdownNow();
        Assert.assertEquals("no messages pending, i.e. dlq empty", 0L, this.broker.getRegionBroker().getDestinationStatistics().getMessages().getCount());
    }

    protected void createOpenwireClients(ExecutorService executorService, ExecutorService executorService2) {
        for (int i = 0; i < 20; i++) {
            LOG.trace("Creating consumer for destination TEST");
            Consumer consumer = new Consumer(DESTNAME, false);
            executorService2.submit(consumer);
            synchronized (consumer.init) {
                try {
                    consumer.init.wait();
                } catch (InterruptedException e) {
                    LOG.error(e.toString(), e);
                }
            }
        }
        for (int i2 = 0; i2 < 100; i2++) {
            LOG.trace("Creating producer for destination TEST");
            executorService.submit(new Producer(DESTNAME, false, 0));
        }
    }
}
