/*
 * Decompiled with CFR 0.152.
 */
package com.github.tocrhz.mqtt.publisher;

import com.github.tocrhz.mqtt.convert.MqttConversionService;
import com.github.tocrhz.mqtt.properties.MqttConfigAdapter;
import com.github.tocrhz.mqtt.subscriber.MqttSubscriber;
import com.github.tocrhz.mqtt.subscriber.TopicPair;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public record SimpleMqttClient(String id, IMqttAsyncClient client, MqttConnectOptions options, boolean enableShared, int qos, LinkedList<MqttSubscriber> subscribers, MqttConfigAdapter adapter) {
    private static final Logger log = LoggerFactory.getLogger(SimpleMqttClient.class);
    private static final ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();

    public void connect() {
        try {
            this.adapter.beforeConnect(this.id, this.options);
            this.client.connect(this.options, null, new IMqttActionListener(){

                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("connect success. client_id is [{}], brokers is [{}].", (Object)SimpleMqttClient.this.id, (Object)String.join((CharSequence)",", SimpleMqttClient.this.options.getServerURIs()));
                    SimpleMqttClient.this.subscribe();
                }

                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("connect failure. client_id is [{}], brokers is [{}]. retry after {} ms.", new Object[]{SimpleMqttClient.this.id, String.join((CharSequence)",", SimpleMqttClient.this.options.getServerURIs()), SimpleMqttClient.this.options.getMaxReconnectDelay()});
                    scheduled.schedule(() -> SimpleMqttClient.this.connect(), (long)SimpleMqttClient.this.options.getMaxReconnectDelay(), TimeUnit.MILLISECONDS);
                }
            });
            this.client.setCallback((MqttCallback)new MqttCallbackExtended(){

                public void connectComplete(boolean reconnect, String serverURI) {
                    if (reconnect) {
                        log.info("mqtt reconnection success.");
                        SimpleMqttClient.this.subscribe();
                    }
                }

                public void connectionLost(Throwable cause) {
                    log.warn("mqtt connection lost.");
                }

                public void messageArrived(String topic, MqttMessage message) {
                    for (MqttSubscriber subscriber : SimpleMqttClient.this.subscribers) {
                        subscriber.accept(SimpleMqttClient.this.id, topic, message);
                    }
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });
        }
        catch (MqttException e) {
            log.error("connect error: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private Set<TopicPair> mergeTopics(String clientId, boolean enableShared) {
        HashSet<TopicPair> topicPairs = new HashSet<TopicPair>();
        for (MqttSubscriber subscriber : this.subscribers) {
            if (!subscriber.containsClientId(clientId)) continue;
            topicPairs.addAll(subscriber.getTopics());
        }
        if (topicPairs.isEmpty()) {
            return topicPairs;
        }
        TopicPair[] pairs = new TopicPair[topicPairs.size()];
        block1: for (TopicPair topic : topicPairs) {
            for (int i = 0; i < pairs.length; ++i) {
                TopicPair pair = pairs[i];
                if (pair == null) {
                    pairs[i] = topic;
                    continue block1;
                }
                if (pair.getQos() != topic.getQos()) continue;
                String temp = pair.getTopic(enableShared).replace('+', '\u0000').replace("#", "\u0000/\u0000");
                if (MqttTopic.isMatched((String)topic.getTopic(enableShared), (String)temp)) {
                    pairs[i] = topic;
                    continue;
                }
                temp = topic.getTopic(enableShared).replace('+', '\u0000').replace("#", "\u0000/\u0000");
                if (MqttTopic.isMatched((String)pair.getTopic(enableShared), (String)temp)) continue block1;
            }
        }
        return Arrays.stream(pairs).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    private void subscribe() {
        try {
            Set<TopicPair> topics = this.mergeTopics(this.id, this.enableShared);
            this.adapter.beforeSubscribe(this.id, topics);
            if (topics.isEmpty()) {
                log.info("there is no topic has been found for client '{}'.", (Object)this.id);
            } else {
                StringJoiner sj = new StringJoiner(",");
                String[] topic = new String[topics.size()];
                int[] qos = new int[topics.size()];
                int i = 0;
                for (TopicPair pair : topics) {
                    topic[i] = pair.getTopic(this.enableShared);
                    qos[i] = pair.getQos();
                    sj.add("('" + topic[i] + "', " + qos[i] + ")");
                    ++i;
                }
                this.client.subscribe(topic, qos);
                log.info("mqtt client '{}' subscribe success. topics : " + sj, (Object)this.id);
            }
        }
        catch (MqttException e) {
            log.error("mqtt client '{}' subscribe failure.", (Object)this.id, (Object)e);
        }
    }

    public void close() {
        try (IMqttAsyncClient imac = this.client();){
            if (imac.isConnected()) {
                imac.disconnect();
            }
        }
        catch (MqttException e) {
            log.error("mqtt client '{}' disconnect error: {}", new Object[]{this.id, e.getMessage(), e});
        }
    }

    public void send(String topic, Object payload) {
        this.send(topic, payload, this.qos(), false, null);
    }

    public void send(String topic, Object payload, boolean retained) {
        this.send(topic, payload, this.qos(), retained, null);
    }

    public void send(String topic, Object payload, IMqttActionListener callback) {
        this.send(topic, payload, this.qos(), false, callback);
    }

    public void send(String topic, Object payload, boolean retained, IMqttActionListener callback) {
        this.send(topic, payload, this.qos(), retained, callback);
    }

    public void send(String topic, Object payload, int qos) {
        this.send(topic, payload, qos, false, null);
    }

    public void send(String topic, Object payload, int qos, boolean retained) {
        this.send(topic, payload, qos, retained, null);
    }

    public void send(String topic, Object payload, int qos, IMqttActionListener callback) {
        this.send(topic, payload, qos, false, callback);
    }

    public void send(String topic, Object payload, int qos, boolean retained, IMqttActionListener callback) {
        Assert.isTrue((topic != null && !topic.isBlank() ? 1 : 0) != 0, (String)"topic cannot be blank.");
        byte[] bytes = MqttConversionService.getSharedInstance().toBytes(payload);
        if (bytes == null) {
            return;
        }
        MqttMessage message = this.toMessage(bytes, qos, retained);
        try {
            this.client.publish(topic, message, null, callback);
        }
        catch (Throwable throwable) {
            log.error("message publish error: {}", (Object)throwable.getMessage(), (Object)throwable);
        }
    }

    private MqttMessage toMessage(byte[] payload, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setPayload(payload);
        message.setQos(qos);
        message.setRetained(retained);
        return message;
    }
}

