package org.jetlinks.rule.engine.executor.node.mqtt.vertx;

import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.jetlinks.rule.engine.executor.node.mqtt.MqttClientManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/executor/node/mqtt/vertx/VertxMqttClientManager.class */
public abstract class VertxMqttClientManager implements MqttClientManager {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttClientManager.class);
    private final Map<String, VertxMqttClient> clients = new ConcurrentHashMap();

    /* loaded from: input_file:org/jetlinks/rule/engine/executor/node/mqtt/vertx/VertxMqttClientManager$VertxMqttConfig.class */
    protected static class VertxMqttConfig {
        private String id;
        private String host;
        private int port;
        private MqttClientOptions options;
        private boolean enabled;

        /* loaded from: input_file:org/jetlinks/rule/engine/executor/node/mqtt/vertx/VertxMqttClientManager$VertxMqttConfig$VertxMqttConfigBuilder.class */
        public static class VertxMqttConfigBuilder {
            private String id;
            private String host;
            private int port;
            private MqttClientOptions options;
            private boolean enabled;

            VertxMqttConfigBuilder() {
            }

            public VertxMqttConfigBuilder id(String str) {
                this.id = str;
                return this;
            }

            public VertxMqttConfigBuilder host(String str) {
                this.host = str;
                return this;
            }

            public VertxMqttConfigBuilder port(int i) {
                this.port = i;
                return this;
            }

            public VertxMqttConfigBuilder options(MqttClientOptions mqttClientOptions) {
                this.options = mqttClientOptions;
                return this;
            }

            public VertxMqttConfigBuilder enabled(boolean z) {
                this.enabled = z;
                return this;
            }

            public VertxMqttConfig build() {
                return new VertxMqttConfig(this.id, this.host, this.port, this.options, this.enabled);
            }

            public String toString() {
                return "VertxMqttClientManager.VertxMqttConfig.VertxMqttConfigBuilder(id=" + this.id + ", host=" + this.host + ", port=" + this.port + ", options=" + this.options + ", enabled=" + this.enabled + ")";
            }
        }

        VertxMqttConfig(String str, String str2, int i, MqttClientOptions mqttClientOptions, boolean z) {
            this.id = str;
            this.host = str2;
            this.port = i;
            this.options = mqttClientOptions;
            this.enabled = z;
        }

        public static VertxMqttConfigBuilder builder() {
            return new VertxMqttConfigBuilder();
        }

        public String getId() {
            return this.id;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public MqttClientOptions getOptions() {
            return this.options;
        }

        public boolean isEnabled() {
            return this.enabled;
        }

        public void setId(String str) {
            this.id = str;
        }

        public void setHost(String str) {
            this.host = str;
        }

        public void setPort(int i) {
            this.port = i;
        }

        public void setOptions(MqttClientOptions mqttClientOptions) {
            this.options = mqttClientOptions;
        }

        public void setEnabled(boolean z) {
            this.enabled = z;
        }
    }

    protected abstract Vertx getVertx();

    protected abstract Mono<VertxMqttConfig> getConfig(String str);

    protected void doClientKeepAlive() {
        for (Map.Entry<String, VertxMqttClient> entry : this.clients.entrySet()) {
            VertxMqttClient value = entry.getValue();
            if (!value.isAlive() && value.getClient() != null && !value.isConnecting()) {
                log.warn("mqtt client [{}] is disconnected", value.getClient().clientId());
                getConfig(entry.getKey()).filter((v0) -> {
                    return v0.isEnabled();
                }).flatMap(this::createMqttClient).doOnError(th -> {
                    log.warn("reconnect mqtt client [{}] failed ", entry.getKey(), th);
                }).subscribe();
            }
        }
    }

    protected void stopClient(String str) {
        try {
            Optional.ofNullable(this.clients.get(str)).ifPresent(this::doClose);
        } catch (Exception e) {
        }
    }

    protected void doClose(VertxMqttClient vertxMqttClient) {
        try {
            vertxMqttClient.connecting.set(false);
            vertxMqttClient.getClient().disconnect();
        } catch (Exception e) {
        }
    }

    protected Mono<VertxMqttClient> createMqttClient(VertxMqttConfig vertxMqttConfig) {
        return Mono.create(monoSink -> {
            synchronized (this.clients) {
                VertxMqttClient vertxMqttClient = this.clients.get(vertxMqttConfig.getId());
                if (vertxMqttClient != null && (vertxMqttClient.isAlive() || vertxMqttClient.isConnecting())) {
                    monoSink.success(vertxMqttClient);
                    return;
                }
                if (vertxMqttClient == null) {
                    vertxMqttClient = new VertxMqttClient();
                    this.clients.put(vertxMqttConfig.getId(), vertxMqttClient);
                }
                vertxMqttClient.connecting.set(true);
                if (!vertxMqttConfig.enabled) {
                    vertxMqttClient.connecting.set(false);
                    monoSink.success(vertxMqttClient);
                    return;
                }
                vertxMqttClient.connectTimeout = vertxMqttConfig.getOptions().getConnectTimeout();
                vertxMqttClient.connectTime = System.currentTimeMillis();
                VertxMqttClient vertxMqttClient2 = vertxMqttClient;
                MqttClient create = MqttClient.create(getVertx(), vertxMqttConfig.options);
                create.exceptionHandler(th -> {
                    vertxMqttClient2.setLastError(th);
                    vertxMqttClient2.connecting.set(false);
                    log.error("mqtt client error", th);
                });
                create.closeHandler(r6 -> {
                    vertxMqttClient2.connecting.set(false);
                    vertxMqttClient2.setLastError(new IOException("mqtt connection closed"));
                    log.warn("mqtt connection closed");
                });
                create.connect(vertxMqttConfig.port, vertxMqttConfig.host, asyncResult -> {
                    vertxMqttClient2.connecting.set(false);
                    if (vertxMqttClient2.getClient() != create) {
                        doClose(vertxMqttClient2);
                    }
                    vertxMqttClient2.setClient(create);
                    monoSink.success(vertxMqttClient2);
                    if (asyncResult.succeeded()) {
                        log.info("connect mqtt[{} {}:{}] success", new Object[]{vertxMqttConfig.getId(), vertxMqttConfig.getHost(), Integer.valueOf(vertxMqttConfig.getPort())});
                    } else {
                        vertxMqttClient2.setLastError(asyncResult.cause());
                        log.info("connect mqtt[{} {}:{}] error", new Object[]{vertxMqttConfig.getId(), vertxMqttConfig.getHost(), Integer.valueOf(vertxMqttConfig.getPort()), asyncResult.cause()});
                    }
                });
            }
        });
    }

    @Override // org.jetlinks.rule.engine.executor.node.mqtt.MqttClientManager
    public Mono<org.jetlinks.rule.engine.executor.node.mqtt.MqttClient> getMqttClient(String str) {
        return Mono.justOrEmpty(this.clients.get(str)).filter((v0) -> {
            return v0.isAlive();
        }).switchIfEmpty(Mono.defer(() -> {
            return getConfig(str).flatMap(this::createMqttClient);
        })).cast(org.jetlinks.rule.engine.executor.node.mqtt.MqttClient.class);
    }
}
