package org.apache.nifi.processors.mqtt.adapters;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttProtocolScheme;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsException;

/* loaded from: input_file:org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.class */
public class HiveMqV5ClientAdapter implements MqttClient {
    private final Mqtt5BlockingClient mqtt5BlockingClient;
    private final MqttClientProperties clientProperties;
    private final ComponentLog logger;
    private MqttCallback callback;

    public HiveMqV5ClientAdapter(MqttClientProperties mqttClientProperties, ComponentLog componentLog) throws TlsException {
        this.mqtt5BlockingClient = createClient(mqttClientProperties, componentLog);
        this.clientProperties = mqttClientProperties;
        this.logger = componentLog;
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public boolean isConnected() {
        return this.mqtt5BlockingClient.getState().isConnected();
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void connect() {
        this.logger.debug("Connecting to broker");
        Mqtt5ConnectBuilder keepAlive = Mqtt5Connect.builder().keepAlive(this.clientProperties.getKeepAliveInterval());
        boolean isCleanSession = this.clientProperties.isCleanSession();
        keepAlive.cleanStart(isCleanSession);
        if (!isCleanSession) {
            keepAlive.sessionExpiryInterval(this.clientProperties.getSessionExpiryInterval().longValue());
        }
        String lastWillTopic = this.clientProperties.getLastWillTopic();
        if (lastWillTopic != null) {
            keepAlive.willPublish().topic(lastWillTopic).payload(this.clientProperties.getLastWillMessage().getBytes()).retain(this.clientProperties.getLastWillRetain().booleanValue()).qos(MqttQos.fromCode(this.clientProperties.getLastWillQos().intValue())).applyWillPublish();
        }
        String username = this.clientProperties.getUsername();
        String password = this.clientProperties.getPassword();
        if (username != null && password != null) {
            keepAlive.simpleAuth().username(this.clientProperties.getUsername()).password(password.getBytes(StandardCharsets.UTF_8)).applySimpleAuth();
        }
        this.mqtt5BlockingClient.connect(keepAlive.build());
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void disconnect() {
        this.logger.debug("Disconnecting client");
        this.mqtt5BlockingClient.disconnect();
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void close() {
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void publish(String str, StandardMqttMessage standardMqttMessage) {
        this.logger.debug("Publishing message to {} with QoS: {}", new Object[]{str, Integer.valueOf(standardMqttMessage.getQos())});
        this.mqtt5BlockingClient.publishWith().topic(str).payload(standardMqttMessage.getPayload()).retain(standardMqttMessage.isRetained()).qos((MqttQos) Objects.requireNonNull(MqttQos.fromCode(standardMqttMessage.getQos()))).send();
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void subscribe(String str, int i) {
        Objects.requireNonNull(this.callback, "callback should be set");
        this.logger.debug("Subscribing to {} with QoS: {}", new Object[]{str, Integer.valueOf(i)});
        try {
            this.logger.debug("Received mqtt5 subscribe ack: {}", new Object[]{(Mqtt5SubAck) this.mqtt5BlockingClient.toAsync().subscribeWith().topicFilter(str).qos((MqttQos) Objects.requireNonNull(MqttQos.fromCode(i))).callback(mqtt5Publish -> {
                this.callback.messageArrived(new ReceivedMqttMessage(mqtt5Publish.getPayloadAsBytes(), mqtt5Publish.getQos().getCode(), mqtt5Publish.isRetain(), mqtt5Publish.getTopic().toString()));
            }).send().get(this.clientProperties.getConnectionTimeout(), TimeUnit.SECONDS)});
        } catch (Exception e) {
            throw new MqttException("An error has occurred during sending subscribe message to broker", e);
        }
    }

    @Override // org.apache.nifi.processors.mqtt.common.MqttClient
    public void setCallback(MqttCallback mqttCallback) {
        this.callback = mqttCallback;
    }

    private static Mqtt5BlockingClient createClient(MqttClientProperties mqttClientProperties, ComponentLog componentLog) throws TlsException {
        componentLog.debug("Creating Mqtt v5 client");
        Mqtt5ClientBuilder serverHost = Mqtt5Client.builder().identifier(mqttClientProperties.getClientId()).serverHost(mqttClientProperties.getBrokerUri().getHost());
        int port = mqttClientProperties.getBrokerUri().getPort();
        if (port != -1) {
            serverHost.serverPort(port);
        }
        if (MqttProtocolScheme.WS.equals(mqttClientProperties.getScheme()) || MqttProtocolScheme.WSS.equals(mqttClientProperties.getScheme())) {
            serverHost.webSocketConfig().applyWebSocketConfig();
        }
        if (MqttProtocolScheme.SSL.equals(mqttClientProperties.getScheme())) {
            if (mqttClientProperties.getTlsConfiguration().getTruststorePath() != null) {
                serverHost.sslConfig().trustManagerFactory(KeyStoreUtils.loadTrustManagerFactory(mqttClientProperties.getTlsConfiguration().getTruststorePath(), mqttClientProperties.getTlsConfiguration().getTruststorePassword(), mqttClientProperties.getTlsConfiguration().getTruststoreType().getType())).applySslConfig();
            }
            if (mqttClientProperties.getTlsConfiguration().getKeystorePath() != null) {
                serverHost.sslConfig().keyManagerFactory(KeyStoreUtils.loadKeyManagerFactory(mqttClientProperties.getTlsConfiguration().getKeystorePath(), mqttClientProperties.getTlsConfiguration().getKeystorePassword(), (String) null, mqttClientProperties.getTlsConfiguration().getKeystoreType().getType())).applySslConfig();
            }
        }
        return serverHost.buildBlocking();
    }
}
