/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.mqtt5.service;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.extend.mqtt5.service.MqttCallbackImpl;
import org.noear.solon.cloud.extend.mqtt5.service.MqttUtil;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.cloud.service.CloudEventServicePlus;

public class CloudEventServiceMqtt5
implements CloudEventServicePlus {
    private static final String PROP_EVENT_clientId = "event.clientId";
    private final CloudProps cloudProps;
    private final String server;
    private final String username;
    private final String password;
    private final long publishTimeout;
    private MqttClient client;
    private String clientId;
    private MqttCallbackImpl clientCallback;
    CloudEventObserverManger observerMap = new CloudEventObserverManger();
    private String channel;
    private String group;

    public MqttClient getClient() {
        return this.client;
    }

    public CloudEventServiceMqtt5(CloudProps cloudProps) {
        this.cloudProps = cloudProps;
        this.server = cloudProps.getEventServer();
        this.username = cloudProps.getUsername();
        this.password = cloudProps.getPassword();
        this.publishTimeout = cloudProps.getEventPublishTimeout();
        this.connect();
    }

    private synchronized void connect() {
        if (this.client != null) {
            return;
        }
        this.clientId = this.getEventClientId();
        if (Utils.isEmpty((String)this.clientId)) {
            this.clientId = Solon.cfg().appName() + "-" + Utils.guid();
        }
        MqttConnectionOptions options = new MqttConnectionOptions();
        if (Utils.isNotEmpty((String)this.username)) {
            options.setUserName(this.username);
        } else {
            options.setUserName(Solon.cfg().appName());
        }
        if (Utils.isNotEmpty((String)this.password)) {
            options.setPassword(this.password.getBytes(StandardCharsets.UTF_8));
        }
        options.setConnectionTimeout(1000);
        options.setKeepAliveInterval(100);
        options.setServerURIs(new String[]{this.server});
        Properties props = this.cloudProps.getEventClientProps();
        if (props.size() > 0) {
            Utils.injectProperties((Object)options, (Properties)props);
        }
        options.setWill("client.close", new MqttMessage(this.clientId.getBytes(StandardCharsets.UTF_8), 1, false, null));
        try {
            this.client = new MqttClient(this.server, this.clientId, (MqttClientPersistence)new MemoryPersistence());
            this.clientCallback = new MqttCallbackImpl(this.client, this.observerMap, this.cloudProps);
            this.client.setCallback((MqttCallback)this.clientCallback);
            this.client.connect(options);
        }
        catch (MqttException ex) {
            throw new IllegalArgumentException(ex);
        }
    }

    public boolean publish(Event event) throws CloudEventException {
        MqttMessage message = new MqttMessage();
        message.setQos(event.qos());
        message.setRetained(event.retained());
        message.setPayload(event.content().getBytes());
        MqttTopic mqttTopic = this.client.getTopic(event.topic());
        try {
            MqttToken token = mqttTopic.publish(message);
            if (event.qos() > 0) {
                token.waitForCompletion(this.publishTimeout);
                return token.isComplete();
            }
            return true;
        }
        catch (Throwable ex) {
            throw new CloudEventException(ex);
        }
    }

    public void attention(EventLevel level, String channel, String group, String topic, String tag, int qos, CloudEventHandler observer) {
        this.observerMap.add(topic, level, group, topic, tag, qos, observer);
    }

    public void subscribe() {
        try {
            if (this.observerMap.topicSize() > 0) {
                MqttUtil.subscribe(this.client, this.cloudProps.getEventChannel(), this.observerMap);
            }
        }
        catch (Throwable ex) {
            throw new RuntimeException(ex);
        }
    }

    public String getChannel() {
        if (this.channel == null) {
            this.channel = this.cloudProps.getEventChannel();
        }
        return this.channel;
    }

    public String getGroup() {
        if (this.group == null) {
            this.group = this.cloudProps.getEventGroup();
        }
        return this.group;
    }

    public String getEventClientId() {
        return this.cloudProps.getValue(PROP_EVENT_clientId);
    }
}

