package org.apache.activemq.transport.mqtt;

import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.ByteSequence;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/mqtt/MQTTTest.class */
public class MQTTTest {
    protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
    protected BrokerService brokerService;
    protected int numberOfMessages;
    protected Vector<Throwable> exceptions = new Vector<>();
    AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() { // from class: org.apache.activemq.transport.mqtt.MQTTTest.1
    };

    @Before
    public void startBroker() throws Exception {
        this.autoFailTestSupport.startAutoFailThread();
        this.exceptions.clear();
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setAdvisorySupport(false);
        this.numberOfMessages = 2000;
    }

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
        }
        this.autoFailTestSupport.stopAutoFailThread();
    }

    @Test
    public void testSendAndReceiveMQTT() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.start();
        MQTT mqtt = new MQTT();
        final BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        Topic topic = new Topic("foo/bah", QoS.AT_MOST_ONCE);
        blockingConnection.subscribe(new Topic[]{topic});
        final CountDownLatch countDownLatch = new CountDownLatch(this.numberOfMessages);
        new Thread(new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTTest.2
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < MQTTTest.this.numberOfMessages; i++) {
                    try {
                        blockingConnection.receive().ack();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
        }).start();
        BlockingConnection blockingConnection2 = mqtt.blockingConnection();
        blockingConnection2.connect();
        for (int i = 0; i < this.numberOfMessages; i++) {
            blockingConnection2.publish(topic.name().toString(), ("Message " + i).getBytes(), QoS.AT_LEAST_ONCE, false);
        }
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, countDownLatch.getCount());
    }

    @Test
    public void testSendAndReceiveAtMostOnce() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.start();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setKeepAlive(Short.MAX_VALUE);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic(UTF8Buffer.utf8("foo"), QoS.AT_MOST_ONCE)});
        for (int i = 0; i < this.numberOfMessages; i++) {
            String str = "Test Message: " + i;
            blockingConnection.publish("foo", str.getBytes(), QoS.AT_MOST_ONCE, false);
            Assert.assertEquals(str, new String(blockingConnection.receive().getPayload()));
        }
        blockingConnection.disconnect();
    }

    @Test
    public void testSendAndReceiveAtLeastOnce() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.start();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setKeepAlive(Short.MAX_VALUE);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic(UTF8Buffer.utf8("foo"), QoS.AT_LEAST_ONCE)});
        for (int i = 0; i < this.numberOfMessages; i++) {
            String str = "Test Message: " + i;
            blockingConnection.publish("foo", str.getBytes(), QoS.AT_LEAST_ONCE, false);
            Message receive = blockingConnection.receive();
            receive.ack();
            Assert.assertEquals(str, new String(receive.getPayload()));
        }
        blockingConnection.disconnect();
    }

    @Test
    public void testSendAndReceiveExactlyOnce() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.start();
        BlockingConnection blockingConnection = createMQTTConnection().blockingConnection();
        blockingConnection.connect();
        BlockingConnection blockingConnection2 = createMQTTConnection().blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic(UTF8Buffer.utf8("foo"), QoS.EXACTLY_ONCE)});
        for (int i = 0; i < this.numberOfMessages; i++) {
            String str = "Test Message: " + i;
            blockingConnection.publish("foo", str.getBytes(), QoS.EXACTLY_ONCE, false);
            Message receive = blockingConnection2.receive();
            receive.ack();
            Assert.assertEquals(str, new String(receive.getPayload()));
        }
        blockingConnection2.disconnect();
        blockingConnection.disconnect();
    }

    @Test
    public void testSendAndReceiveLargeMessages() throws Exception {
        byte[] bArr = new byte[32768];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 50;
        }
        addMQTTConnector(this.brokerService);
        this.brokerService.start();
        BlockingConnection blockingConnection = createMQTTConnection().blockingConnection();
        blockingConnection.connect();
        BlockingConnection blockingConnection2 = createMQTTConnection().blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic(UTF8Buffer.utf8("foo"), QoS.AT_LEAST_ONCE)});
        for (int i2 = 0; i2 < 10; i2++) {
            blockingConnection.publish("foo", bArr, QoS.AT_LEAST_ONCE, false);
            Message receive = blockingConnection2.receive();
            receive.ack();
            Assert.assertArrayEquals(bArr, receive.getPayload());
        }
        blockingConnection2.disconnect();
        blockingConnection.disconnect();
    }

    @Test
    public void testSendMQTTReceiveJMS() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.addConnector(NetworkedSyncTest.broker1URL);
        this.brokerService.start();
        BlockingConnection blockingConnection = createMQTTConnection().blockingConnection();
        blockingConnection.connect();
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory().createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("foo.*"));
        for (int i = 0; i < this.numberOfMessages; i++) {
            String str = "Test Message: " + i;
            blockingConnection.publish("foo/bah", str.getBytes(), QoS.AT_LEAST_ONCE, false);
            ByteSequence content = createConsumer.receive().getContent();
            Assert.assertEquals(str, new String(content.data, content.offset, content.length));
        }
        createConnection.close();
        blockingConnection.disconnect();
    }

    @Test
    public void testSendJMSReceiveMQTT() throws Exception {
        addMQTTConnector(this.brokerService);
        this.brokerService.addConnector(NetworkedSyncTest.broker1URL);
        this.brokerService.start();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setKeepAlive(Short.MAX_VALUE);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory().createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic("foo.far"));
        blockingConnection.subscribe(new Topic[]{new Topic(UTF8Buffer.utf8("foo/+"), QoS.AT_MOST_ONCE)});
        for (int i = 0; i < this.numberOfMessages; i++) {
            String str = "This is Test Message: " + i;
            createProducer.send(createSession.createTextMessage(str));
            Message receive = blockingConnection.receive();
            receive.ack();
            Assert.assertEquals(str, new String(receive.getPayload()));
        }
        blockingConnection.disconnect();
    }

    protected void addMQTTConnector(BrokerService brokerService) throws Exception {
        brokerService.addConnector("mqtt://localhost:1883");
    }

    protected MQTT createMQTTConnection() throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost("localhost", 1883);
        return mqtt;
    }
}
