package io.vertx.mqtt.test;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/mqtt/test/MqttServerPublishTest.class */
public class MqttServerPublishTest extends MqttBaseTest {
    private static final Logger log = LoggerFactory.getLogger(MqttServerPublishTest.class);
    private Async async;
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Server";
    private String topic;
    private String message;

    @Before
    public void before(TestContext testContext) {
        setUp(testContext);
    }

    @After
    public void after(TestContext testContext) {
        tearDown(testContext);
    }

    @Test
    public void publishQos0(TestContext testContext) {
        publish(testContext, MQTT_TOPIC, MQTT_MESSAGE, 0);
    }

    @Test
    public void publishQos1(TestContext testContext) {
        publish(testContext, MQTT_TOPIC, MQTT_MESSAGE, 1);
    }

    @Test
    public void publishQos2(TestContext testContext) {
        publish(testContext, MQTT_TOPIC, MQTT_MESSAGE, 2);
    }

    private void publish(TestContext testContext, String str, String str2, int i) {
        this.topic = str;
        this.message = str2;
        this.async = testContext.async();
        try {
            MqttClient mqttClient = new MqttClient(String.format("tcp://%s:%d", "localhost", 1883), "12345", new MemoryPersistence());
            mqttClient.connect();
            mqttClient.subscribe(str, i, new IMqttMessageListener() { // from class: io.vertx.mqtt.test.MqttServerPublishTest.1
                public void messageArrived(String str3, MqttMessage mqttMessage) throws Exception {
                    MqttServerPublishTest.log.info("Just received message [" + mqttMessage.toString() + "] on topic [" + str3 + "] with QoS [" + mqttMessage.getQos() + "]");
                    if (mqttMessage.getQos() == 0) {
                        MqttServerPublishTest.this.async.complete();
                    }
                }
            });
            this.async.await();
            testContext.assertTrue(true);
        } catch (MqttException e) {
            testContext.assertTrue(false);
            e.printStackTrace();
        }
    }

    @Override // io.vertx.mqtt.test.MqttBaseTest
    protected void endpointHandler(MqttEndpoint mqttEndpoint) {
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), (List) mqttSubscribeMessage.topicSubscriptions().stream().map((v0) -> {
                return v0.qualityOfService();
            }).collect(Collectors.toList()));
            mqttEndpoint.publish(this.topic, Buffer.buffer(this.message), ((MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0)).qualityOfService(), false, false);
        }).publishAcknowledgeHandler(num -> {
            log.info("Message [" + num + "] acknowledged");
            this.async.complete();
        }).publishReceivedHandler(num2 -> {
            mqttEndpoint.publishRelease(num2.intValue());
        }).publishCompleteHandler(num3 -> {
            log.info("Message [" + num3 + "] acknowledged");
            this.async.complete();
        });
        mqttEndpoint.accept(false);
    }
}
