package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/usecases/MessageGroupReconnectDistributionTest.class */
public class MessageGroupReconnectDistributionTest {
    public static final Logger LOG = LoggerFactory.getLogger(MessageGroupReconnectDistributionTest.class);
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected TransportConnector connector;
    ActiveMQConnectionFactory connFactory;
    BrokerService broker;
    final Random random = new Random();
    protected ActiveMQQueue destination = new ActiveMQQueue("GroupQ");
    int numMessages = 10000;
    int groupSize = 10;
    int batchSize = 20;

    @Parameterized.Parameter(0)
    public int numConsumers = 4;

    @Parameterized.Parameter(1)
    public boolean consumerPriority = true;

    @Parameterized.Parameters(name = "numConsumers={0},consumerPriority={1}")
    public static Iterable<Object[]> combinations() {
        return Arrays.asList(new Object[]{4, true}, new Object[]{10, true});
    }

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.connFactory = new ActiveMQConnectionFactory(this.connector.getConnectUri() + "?jms.prefetchPolicy.all=200");
        this.connFactory.setWatchTopicAdvisories(false);
        this.connection = this.connFactory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setAdvisorySupport(false);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setUseConsumerPriority(this.consumerPriority);
        policyEntry.setMessageGroupMapFactoryType("cached?cacheSize=" + (this.numConsumers - 1));
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        this.connector = brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        return brokerService;
    }

    @After
    public void tearDown() throws Exception {
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.broker.stop();
    }

    @Test(timeout = 300000)
    public void testReconnect() throws Exception {
        final AtomicLong atomicLong = new AtomicLong(0L);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.numConsumers);
        final ArrayList arrayList = new ArrayList(this.numConsumers);
        final ArrayList arrayList2 = new ArrayList(this.numConsumers);
        for (int i = 0; i < this.numConsumers; i++) {
            arrayList.add(new AtomicLong(0L));
            arrayList2.add(new AtomicLong(0L));
            final int i2 = i;
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.usecases.MessageGroupReconnectDistributionTest.1
                int getBatchSize() {
                    return (i2 + 1) * MessageGroupReconnectDistributionTest.this.batchSize;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Session createSession = MessageGroupReconnectDistributionTest.this.connection.createSession(false, 1);
                        int batchSize = getBatchSize();
                        MessageConsumer createConsumer = createSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                        AtomicLong atomicLong2 = (AtomicLong) arrayList.get(i2);
                        AtomicLong atomicLong3 = (AtomicLong) arrayList2.get(i2);
                        Logger logger = MessageGroupReconnectDistributionTest.LOG;
                        int i3 = i2;
                        long j = atomicLong.get();
                        atomicLong2.get();
                        logger.info("Consumer: " + i3 + ", batchSize:" + batchSize + ", totalConsumed:" + j + ", consumed:" + logger);
                        while (atomicLong.get() < MessageGroupReconnectDistributionTest.this.numMessages) {
                            if (createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART) == null) {
                                Logger logger2 = MessageGroupReconnectDistributionTest.LOG;
                                long j2 = atomicLong.get();
                                atomicLong2.get();
                                logger2.info("Consumer: " + i2 + ", batchSize:" + batchSize + ", null message (totalConsumed:" + j2 + ") consumed:" + logger2);
                                createConsumer.close();
                                if (atomicLong.get() == MessageGroupReconnectDistributionTest.this.numMessages) {
                                    break;
                                }
                                batchSize = getBatchSize();
                                createConsumer = createSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                                atomicLong3.incrementAndGet();
                            } else {
                                atomicLong2.incrementAndGet();
                                atomicLong.incrementAndGet();
                                if (atomicLong2.get() > 0 && atomicLong2.intValue() % batchSize == 0) {
                                    createConsumer.close();
                                    batchSize = getBatchSize();
                                    createConsumer = createSession.createConsumer(MessageGroupReconnectDistributionTest.this.destWithPrefetch(MessageGroupReconnectDistributionTest.this.destination));
                                    atomicLong3.incrementAndGet();
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            TimeUnit.MILLISECONDS.sleep(200L);
        }
        TimeUnit.SECONDS.sleep(1L);
        produceMessages(this.numMessages);
        newFixedThreadPool.shutdown();
        Assert.assertTrue("threads done on time", newFixedThreadPool.awaitTermination(10L, TimeUnit.MINUTES));
        Assert.assertEquals("All consumed", this.numMessages, atomicLong.intValue());
        LOG.info("Distribution: " + arrayList);
        LOG.info("Batches: " + arrayList2);
        double longValue = ((AtomicLong) arrayList.get(0)).longValue() * 1.5d;
        double longValue2 = ((AtomicLong) arrayList.get(0)).longValue() * 0.5d;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            AtomicLong atomicLong2 = (AtomicLong) it.next();
            Assert.assertTrue("Even +/- 50% distribution on consumed:" + arrayList + ", outlier:" + atomicLong2.get(), ((double) atomicLong2.longValue()) < longValue && ((double) atomicLong2.longValue()) > longValue2);
        }
    }

    private Destination destWithPrefetch(ActiveMQQueue activeMQQueue) throws Exception {
        return activeMQQueue;
    }

    private void produceMessages(int i) throws JMSException {
        int i2 = 0;
        for (int i3 = 0; i3 < i; i3++) {
            if (i3 > 0 && i3 % this.groupSize == 0) {
                i2++;
            }
            TextMessage createTextMessage = this.session.createTextMessage("hello " + i3);
            createTextMessage.setStringProperty("JMSXGroupID", "Group-" + i2);
            this.producer.send(createTextMessage);
        }
    }
}
