package ai.konduit.serving.vertx.protocols.mqtt.verticle;

import ai.konduit.serving.pipeline.api.data.Data;
import ai.konduit.serving.vertx.verticle.InferenceVerticle;
import com.google.common.base.Strings;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.SelfSignedCertificate;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/konduit/serving/vertx/protocols/mqtt/verticle/InferenceVerticleMqtt.class */
public class InferenceVerticleMqtt extends InferenceVerticle {
    private static final Logger log = LoggerFactory.getLogger(InferenceVerticleMqtt.class);

    public void start(Promise<Void> promise) {
        this.vertx.executeBlocking(promise2 -> {
            try {
                initialize();
                promise2.complete();
            } catch (Exception e) {
                promise2.fail(e);
                promise.fail(e);
            }
        }, asyncResult -> {
            int parseInt;
            if (asyncResult.failed()) {
                if (asyncResult.cause() != null) {
                    promise.fail(asyncResult.cause());
                    return;
                } else {
                    promise.fail("Failed to start. Unknown cause.");
                    return;
                }
            }
            String str = System.getenv("KONDUIT_SERVING_PORT");
            if (str != null) {
                try {
                    parseInt = Integer.parseInt(str);
                } catch (NumberFormatException e) {
                    log.error("Environment variable \"{}={}\" isn't a valid port number.", "KONDUIT_SERVING_PORT", str);
                    promise.fail(e);
                    return;
                }
            } else {
                parseInt = this.inferenceConfiguration.port();
            }
            if (parseInt < 0 || parseInt > 65535) {
                promise.fail(new Exception("Valid port range is 0 <= port <= 65535. The given port was " + parseInt));
                return;
            }
            MqttServerOptions port = new MqttServerOptions().setHost(this.inferenceConfiguration.host()).setPort(parseInt);
            boolean useSsl = this.inferenceConfiguration.useSsl();
            String sslKeyPath = this.inferenceConfiguration.sslKeyPath();
            String sslCertificatePath = this.inferenceConfiguration.sslCertificatePath();
            if (useSsl) {
                if (Strings.isNullOrEmpty(sslKeyPath) || Strings.isNullOrEmpty(sslCertificatePath)) {
                    if (Strings.isNullOrEmpty(sslKeyPath)) {
                        log.warn("No pem key file specified for SSL.");
                    }
                    if (Strings.isNullOrEmpty(sslCertificatePath)) {
                        log.warn("No pem certificate file specified for SSL.");
                    }
                    log.info("Using an auto generated self signed pem key and certificate with SSL.");
                    port.setKeyCertOptions(SelfSignedCertificate.create().keyCertOptions());
                } else {
                    String absolutePath = new File(sslKeyPath).getAbsolutePath();
                    String absolutePath2 = new File(sslCertificatePath).getAbsolutePath();
                    log.info("Using SSL with PEM Key: {} and certificate {}.", absolutePath, absolutePath2);
                    port.setPemKeyCertOptions(new PemKeyCertOptions().setKeyPath(absolutePath).setCertPath(absolutePath2));
                }
            }
            MqttServer.create(this.vertx, port).endpointHandler(mqttEndpoint -> {
                log.info("MQTT client [{}] request to connect, clean session = {}", mqttEndpoint.clientIdentifier(), Boolean.valueOf(mqttEndpoint.isCleanSession()));
                if (mqttEndpoint.auth() != null) {
                    log.info("[username = {}, password = {}]", mqttEndpoint.auth().getUsername(), mqttEndpoint.auth().getPassword());
                }
                if (mqttEndpoint.will() != null) {
                    log.info("[will topic = {} msg = {} QoS = {} isRetain = {}]", new Object[]{mqttEndpoint.will().getWillTopic(), mqttEndpoint.will().getWillMessageBytes(), Integer.valueOf(mqttEndpoint.will().getWillQos()), Boolean.valueOf(mqttEndpoint.will().isWillRetain())});
                }
                log.info("[keep alive timeout = {}]", Integer.valueOf(mqttEndpoint.keepAliveTimeSeconds()));
                MqttEndpoint publishHandler = mqttEndpoint.accept(false).disconnectHandler(r3 -> {
                    log.info("Received disconnect from client");
                }).subscribeHandler(mqttSubscribeMessage -> {
                    mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), (List) mqttSubscribeMessage.topicSubscriptions().stream().map(mqttTopicSubscription -> {
                        log.info("Subscription for {} with QoS {}", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService());
                        return mqttTopicSubscription.qualityOfService();
                    }).collect(Collectors.toList()));
                }).unsubscribeHandler(mqttUnsubscribeMessage -> {
                    mqttUnsubscribeMessage.topics().forEach(str2 -> {
                        log.info("Unsubscription for {}", str2);
                    });
                    mqttEndpoint.unsubscribeAcknowledge(mqttUnsubscribeMessage.messageId());
                }).publishHandler(mqttPublishMessage -> {
                    String str2 = mqttPublishMessage.topicName();
                    String buffer = mqttPublishMessage.payload().toString(StandardCharsets.UTF_8);
                    log.info("Just received message [{}] with QoS [{}}] in topic [{}]", new Object[]{buffer, mqttPublishMessage.qosLevel(), str2});
                    if (mqttPublishMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        mqttEndpoint.publishAcknowledge(mqttPublishMessage.messageId());
                    } else if (mqttPublishMessage.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        mqttEndpoint.publishReceived(mqttPublishMessage.messageId());
                    }
                    try {
                        String json = this.pipelineExecutor.exec(Data.fromJson(buffer)).toJson();
                        String str3 = str2 + "-out";
                        log.debug("Publishing message: {} to topic: {}", json, str3);
                        mqttEndpoint.publish(str3, Buffer.buffer(json), MqttQoS.EXACTLY_ONCE, false, false);
                        log.debug("Message published to topic: {}", str3);
                    } catch (Throwable th) {
                        log.error("Unable to publish data due to the following error", th);
                        mqttEndpoint.publish(str2 + "-out", Buffer.buffer(new JsonObject().put("errorMessage", th.getMessage()).encodePrettily()), MqttQoS.EXACTLY_ONCE, false, false);
                    }
                });
                Objects.requireNonNull(mqttEndpoint);
                MqttEndpoint publishAcknowledgeHandler = publishHandler.publishReleaseHandler((v1) -> {
                    r1.publishComplete(v1);
                }).publishAcknowledgeHandler(num -> {
                    log.info("Received ack for message = {}", num);
                });
                Objects.requireNonNull(mqttEndpoint);
                publishAcknowledgeHandler.publishReceivedHandler((v1) -> {
                    r1.publishRelease(v1);
                }).publishCompletionHandler(num2 -> {
                    log.info("Received ack for message = {}", num2);
                }).pingHandler(r32 -> {
                    log.info("Ping received from client");
                });
            }).listen(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                    return;
                }
                int actualPort = ((MqttServer) asyncResult.result()).actualPort();
                this.inferenceConfiguration.port(actualPort);
                try {
                    this.context.getDeployment().deploymentOptions().setConfig(new JsonObject(this.inferenceConfiguration.toJson()));
                    saveInspectionDataIfRequired(getPid());
                    log.info("MQTT server listening on host: '{}'", this.inferenceConfiguration.host());
                    log.info("MQTT server started on port {}", Integer.valueOf(actualPort));
                    promise.complete();
                } catch (Throwable th) {
                    promise.fail(th);
                }
            });
        });
    }
}
