package org.jetlinks.rule.engine.executor.node.mqtt.vertx;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.rule.engine.executor.node.mqtt.MqttClient;
import org.jetlinks.supports.utils.MqttTopicUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/mqtt/vertx/VertxMqttClient.class */
public class VertxMqttClient implements MqttClient {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttClient.class);
    private io.vertx.mqtt.MqttClient client;
    long connectTimeout;
    private volatile Throwable lastError;
    private Map<String, AtomicInteger> topicsSubscribeCounter = new ConcurrentHashMap();
    private boolean neverSubscribe = true;
    volatile AtomicBoolean connecting = new AtomicBoolean();
    long connectTime = System.currentTimeMillis();
    private FluxProcessor<MqttMessage, MqttMessage> messageProcessor = EmitterProcessor.create(false);

    public boolean isConnecting() {
        if (System.currentTimeMillis() - this.connectTime >= this.connectTimeout) {
            this.connecting.set(false);
        }
        return this.connecting.get();
    }

    public void setClient(io.vertx.mqtt.MqttClient mqttClient) {
        this.client = mqttClient;
        if (isAlive()) {
            mqttClient.publishHandler(mqttPublishMessage -> {
                if (this.neverSubscribe || this.messageProcessor.hasDownstreams()) {
                    this.messageProcessor.onNext(SimpleMqttMessage.builder().topic(mqttPublishMessage.topicName()).clientId(mqttClient.clientId()).qosLevel(mqttPublishMessage.qosLevel().value()).retain(mqttPublishMessage.isRetain()).dup(mqttPublishMessage.isDup()).payload(mqttPublishMessage.payload().getByteBuf()).messageId(mqttPublishMessage.messageId()).build());
                }
            });
            if (this.topicsSubscribeCounter.isEmpty()) {
                return;
            }
            Map map = (Map) this.topicsSubscribeCounter.entrySet().stream().filter(entry -> {
                return ((AtomicInteger) entry.getValue()).get() > 0;
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toMap(Function.identity(), str -> {
                return 0;
            }));
            if (map.isEmpty()) {
                return;
            }
            log.info("re subscribe [{}] topic {}", mqttClient.clientId(), map.keySet());
            mqttClient.subscribe(map);
        }
    }

    private AtomicInteger getTopicCounter(String str) {
        return this.topicsSubscribeCounter.computeIfAbsent(str, str2 -> {
            return new AtomicInteger();
        });
    }

    @Override // org.jetlinks.rule.engine.executor.node.mqtt.MqttClient
    public Flux<MqttMessage> subscribe(List<String> list) {
        this.neverSubscribe = false;
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return Flux.defer(() -> {
            Map map = (Map) list.stream().filter(str -> {
                return getTopicCounter(str).getAndIncrement() == 0;
            }).collect(Collectors.toMap(Function.identity(), str2 -> {
                return 0;
            }));
            if (isAlive() && !map.isEmpty()) {
                log.info("subscribe mqtt [{}] topic : {}", this.client.clientId(), map);
                this.client.subscribe(map);
            }
            return this.messageProcessor.filter(mqttMessage -> {
                return list.stream().anyMatch(str3 -> {
                    return MqttTopicUtils.match(str3, mqttMessage.getTopic());
                });
            });
        }).doOnCancel(() -> {
            if (atomicBoolean.getAndSet(true)) {
                return;
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                if (getTopicCounter(str).decrementAndGet() <= 0 && isAlive()) {
                    log.info("unsubscribe mqtt [{}] topic : {}", this.client.clientId(), str);
                    this.client.unsubscribe(str);
                }
            }
        });
    }

    @Override // org.jetlinks.rule.engine.executor.node.mqtt.MqttClient
    public Mono<Boolean> publish(MqttMessage mqttMessage) {
        return Mono.create(monoSink -> {
            if (isAlive()) {
                this.client.publish(mqttMessage.getTopic(), Buffer.buffer(mqttMessage.getPayload()), MqttQoS.valueOf(mqttMessage.getQosLevel()), mqttMessage.isDup(), mqttMessage.isRetain(), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        log.info("publish mqtt [{}] message success: {}", this.client.clientId(), mqttMessage);
                        monoSink.success(true);
                    } else {
                        log.info("publish mqtt [{}] message error : {}", new Object[]{this.client.clientId(), mqttMessage, asyncResult.cause()});
                        monoSink.error(asyncResult.cause());
                    }
                });
            } else {
                monoSink.error(new IOException("mqtt client not alive"));
            }
        });
    }

    @Override // org.jetlinks.rule.engine.executor.node.mqtt.MqttClient
    public boolean isAlive() {
        return this.client != null && this.client.isConnected();
    }

    public io.vertx.mqtt.MqttClient getClient() {
        return this.client;
    }

    @Override // org.jetlinks.rule.engine.executor.node.mqtt.MqttClient
    public Throwable getLastError() {
        return this.lastError;
    }

    public void setLastError(Throwable th) {
        this.lastError = th;
    }
}
