package org.noear.solon.cloud.extend.mqtt.service;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.extend.mqtt.MqttProps;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.service.CloudEventObserverEntity;
import org.noear.solon.cloud.service.CloudEventService;

/* loaded from: input_file:org/noear/solon/cloud/extend/mqtt/service/CloudEventServiceMqttImp.class */
public class CloudEventServiceMqttImp implements CloudEventService {
    private static CloudEventServiceMqttImp instance;
    private final String server;
    private final String username;
    private final String password;
    private MqttClient client;
    private String clientId;
    private MqttCallbackImp clientCallback;
    Map<String, CloudEventObserverEntity> observerMap;

    public static synchronized CloudEventServiceMqttImp getInstance() {
        if (instance == null) {
            instance = new CloudEventServiceMqttImp();
        }
        return instance;
    }

    private CloudEventServiceMqttImp() {
        this.observerMap = new HashMap();
        this.server = MqttProps.instance.getEventServer();
        this.username = MqttProps.instance.getUsername();
        this.password = MqttProps.instance.getPassword();
        connect();
    }

    public CloudEventServiceMqttImp(Properties properties) {
        this.observerMap = new HashMap();
        this.server = properties.getProperty("server");
        this.username = properties.getProperty("username");
        this.password = properties.getProperty("password");
        connect();
    }

    private synchronized void connect() {
        if (this.client != null) {
            return;
        }
        this.clientId = MqttProps.clientId();
        if (Utils.isEmpty(this.clientId)) {
            this.clientId = Solon.cfg().appName() + "-" + Utils.guid();
        }
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        if (Utils.isNotEmpty(this.username)) {
            mqttConnectOptions.setUserName(this.username);
        } else {
            mqttConnectOptions.setUserName(Solon.cfg().appName());
        }
        if (Utils.isNotEmpty(this.password)) {
            mqttConnectOptions.setPassword(this.password.toCharArray());
        }
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setConnectionTimeout(1000);
        mqttConnectOptions.setKeepAliveInterval(100);
        mqttConnectOptions.setServerURIs(new String[]{this.server});
        Properties eventClientProps = MqttProps.instance.getEventClientProps();
        if (eventClientProps.size() > 0) {
            Utils.injectProperties(mqttConnectOptions, eventClientProps);
        }
        mqttConnectOptions.setWill("client.close", this.clientId.getBytes(StandardCharsets.UTF_8), 1, false);
        try {
            this.client = new MqttClient(this.server, this.clientId, new MemoryPersistence());
            this.clientCallback = new MqttCallbackImp(this.client);
            this.client.setCallback(this.clientCallback);
            this.client.connect(mqttConnectOptions);
        } catch (MqttException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public boolean publish(Event event) throws CloudEventException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(event.qos());
        mqttMessage.setRetained(event.retained());
        mqttMessage.setPayload(event.content().getBytes());
        try {
            MqttDeliveryToken publish = this.client.getTopic(event.topic()).publish(mqttMessage);
            if (event.qos() <= 0) {
                return true;
            }
            publish.waitForCompletion(30000L);
            return publish.isComplete();
        } catch (Throwable th) {
            throw new CloudEventException(th);
        }
    }

    public void attention(EventLevel eventLevel, String str, String str2, String str3, CloudEventHandler cloudEventHandler) {
        if (this.observerMap.containsKey(str3)) {
            return;
        }
        this.observerMap.put(str3, new CloudEventObserverEntity(eventLevel, str2, str3, cloudEventHandler));
    }

    public void subscribe() {
        try {
            if (this.observerMap.size() > 0) {
                this.clientCallback.subscribe(this.observerMap);
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }
}
