package org.noear.solon.cloud.extend.mqtt.service;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.extend.mqtt.event.MqttDeliveryCompleteEvent;
import org.noear.solon.cloud.extend.mqtt.service.MqttClientManager;
import org.noear.solon.cloud.model.EventObserver;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.core.event.EventBus;
import org.noear.solon.core.util.RunUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/noear/solon/cloud/extend/mqtt/service/MqttClientManagerImpl.class */
public class MqttClientManagerImpl implements MqttClientManager, MqttCallbackExtended {
    private static final Logger log = LoggerFactory.getLogger(MqttClientManagerImpl.class);
    private static final String PROP_EVENT_clientId = "event.clientId";
    private final String server;
    private final String username;
    private final String password;
    private final CloudEventObserverManger observerManger;
    private final String eventChannelName;
    private final MqttConnectOptions options;
    private String clientId;
    private boolean async = true;
    private final Set<MqttClientManager.ConnectCallback> connectCallbacks = Collections.synchronizedSet(new HashSet());
    IMqttAsyncClient client;

    public MqttClientManagerImpl(CloudEventObserverManger cloudEventObserverManger, CloudProps cloudProps) {
        this.observerManger = cloudEventObserverManger;
        this.eventChannelName = cloudProps.getEventChannel();
        this.server = cloudProps.getEventServer();
        this.username = cloudProps.getUsername();
        this.password = cloudProps.getPassword();
        this.clientId = cloudProps.getValue(PROP_EVENT_clientId);
        if (Utils.isEmpty(this.clientId)) {
            this.clientId = Solon.cfg().appName() + "-" + Utils.guid();
        }
        this.options = new MqttConnectOptions();
        if (Utils.isNotEmpty(this.username)) {
            this.options.setUserName(this.username);
        } else {
            this.options.setUserName(Solon.cfg().appName());
        }
        if (Utils.isNotEmpty(this.password)) {
            this.options.setPassword(this.password.toCharArray());
        }
        this.options.setWill("client.close", this.clientId.getBytes(StandardCharsets.UTF_8), 1, false);
        this.options.setConnectionTimeout(30);
        this.options.setKeepAliveInterval(20);
        this.options.setServerURIs(new String[]{this.server});
        this.options.setCleanSession(false);
        this.options.setAutomaticReconnect(true);
        this.options.setMaxInflight(128);
        Properties eventClientProps = cloudProps.getEventClientProps();
        if (eventClientProps.size() > 0) {
            Utils.injectProperties(this.options, eventClientProps);
        }
        EventBus.publish(this.options);
    }

    public synchronized void connectionLost(Throwable th) {
        log.warn("MQTT connection lost, clientId={}", this.clientId, th);
        if (this.options.isAutomaticReconnect()) {
            return;
        }
        this.client = null;
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        if (log.isTraceEnabled()) {
            log.trace("MQTT message arrived, clientId={}, messageId={}", this.clientId, Integer.valueOf(mqttMessage.getId()));
        }
        EventObserver byTopic = this.observerManger.getByTopic(str);
        if (byTopic != null) {
            MqttMessageHandler mqttMessageHandler = new MqttMessageHandler(this, this.eventChannelName, byTopic, str, mqttMessage);
            if (getAsync()) {
                RunUtil.parallel(mqttMessageHandler);
            } else {
                mqttMessageHandler.run();
            }
        }
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        if (iMqttDeliveryToken.getMessageId() > 0) {
            if (log.isDebugEnabled()) {
                log.debug("MQTT message delivery completed, clientId={}, messageId={}", this.clientId, Integer.valueOf(iMqttDeliveryToken.getMessageId()));
            }
            EventBus.publish(new MqttDeliveryCompleteEvent(this.clientId, iMqttDeliveryToken.getMessageId(), iMqttDeliveryToken));
        }
    }

    public void connectComplete(boolean z, String str) {
        this.connectCallbacks.forEach(connectCallback -> {
            connectCallback.connectComplete(z);
        });
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public synchronized IMqttAsyncClient getClient() {
        if (this.client == null) {
            this.client = createClient();
        }
        return this.client;
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public String getClientId() {
        return this.clientId;
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public void setAsync(boolean z) {
        this.async = z;
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public boolean getAsync() {
        return this.async;
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public void addCallback(MqttClientManager.ConnectCallback connectCallback) {
        this.connectCallbacks.add(connectCallback);
    }

    @Override // org.noear.solon.cloud.extend.mqtt.service.MqttClientManager
    public boolean removeCallback(MqttClientManager.ConnectCallback connectCallback) {
        return this.connectCallbacks.remove(connectCallback);
    }

    private IMqttAsyncClient createClient() {
        try {
            this.client = new MqttAsyncClient(this.server, this.clientId, new MemoryPersistence());
            this.client.setManualAcks(true);
            this.client.setCallback(this);
            this.client.connect(this.options).waitForCompletion(this.options.getConnectionTimeout() * 1000);
            subscribe();
            return this.client;
        } catch (MqttException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }

    private void subscribe() throws MqttException {
        if (this.observerManger.topicSize() < 1) {
            return;
        }
        String[] strArr = (String[]) this.observerManger.topicAll().toArray(new String[0]);
        int[] iArr = new int[strArr.length];
        IMqttMessageListener[] iMqttMessageListenerArr = new IMqttMessageListener[strArr.length];
        int length = iArr.length;
        for (int i = 0; i < length; i++) {
            EventObserver byTopic = this.observerManger.getByTopic(strArr[i]);
            iArr[i] = byTopic.getQos();
            iMqttMessageListenerArr[i] = new MqttMessageListenerImpl(this, this.eventChannelName, byTopic);
        }
        getClient().subscribe(strArr, iArr, iMqttMessageListenerArr).waitForCompletion(this.options.getConnectionTimeout() * 1000);
    }
}
