package org.apache.nifi.processors.mqtt;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.apache.nifi.processors.mqtt.common.MqttConstants;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
@SeeAlso({PublishMQTT.class})
@WritesAttributes({@WritesAttribute(attribute = ConsumeMQTT.BROKER_ATTRIBUTE_KEY, description = "MQTT broker that was the message source"), @WritesAttribute(attribute = ConsumeMQTT.TOPIC_ATTRIBUTE_KEY, description = "MQTT topic on which message was received"), @WritesAttribute(attribute = ConsumeMQTT.QOS_ATTRIBUTE_KEY, description = "The quality of service for this message."), @WritesAttribute(attribute = ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY, description = "Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute = ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY, description = "Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published on the topic.")})
/* loaded from: input_file:org/apache/nifi/processors/mqtt/ConsumeMQTT.class */
public class ConsumeMQTT extends AbstractMQTTProcessor {
    public static final String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
    public static final String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
    public static final String QOS_ATTRIBUTE_KEY = "mqtt.qos";
    public static final String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
    public static final String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
    private volatile long maxQueueSize;
    private volatile int qos;
    private volatile String topicFilter;
    private final AtomicBoolean scheduled = new AtomicBoolean(false);
    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
    private static final List<PropertyDescriptor> descriptors;
    private static final Set<Relationship> relationships;
    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder().name("Topic Filter").description("The MQTT topic filter to designate the topics to subscribe to.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder().name("Quality of Service(QoS)").description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.").required(true).defaultValue(MqttConstants.ALLOWABLE_VALUE_QOS_0.getValue()).allowableValues(new AllowableValue[]{MqttConstants.ALLOWABLE_VALUE_QOS_0, MqttConstants.ALLOWABLE_VALUE_QOS_1, MqttConstants.ALLOWABLE_VALUE_QOS_2}).build();
    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder().name("Max Queue Size").description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private static int DISCONNECT_TIMEOUT = 5000;
    public static final Relationship REL_MESSAGE = new Relationship.Builder().name("Message").description("The MQTT message output").build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/mqtt/ConsumeMQTT$ConsumeMQTTCallback.class */
    public class ConsumeMQTTCallback implements MqttCallback {
        private ConsumeMQTTCallback() {
        }

        public void connectionLost(Throwable th) {
            ConsumeMQTT.this.logger.warn("Connection to " + ConsumeMQTT.this.broker + " lost", th);
            try {
                ConsumeMQTT.this.reconnect();
            } catch (MqttException e) {
                ConsumeMQTT.this.logger.error("Connection to " + ConsumeMQTT.this.broker + " lost and callback re-connect failed.");
            }
        }

        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            if (ConsumeMQTT.this.logger.isDebugEnabled()) {
                byte[] payload = mqttMessage.getPayload();
                String str2 = new String(payload, "UTF-8");
                if (StringUtils.isAsciiPrintable(str2)) {
                    ConsumeMQTT.this.logger.debug("Message arrived from topic {}. Payload: {}", new Object[]{str, str2});
                } else {
                    ConsumeMQTT.this.logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[]{str, Integer.valueOf(payload.length)});
                }
            }
            if (ConsumeMQTT.this.mqttQueue.size() >= ConsumeMQTT.this.maxQueueSize) {
                throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
            }
            ConsumeMQTT.this.mqttQueue.add(new MQTTQueueMessage(str, mqttMessage));
        }

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            ConsumeMQTT.this.logger.warn("Received MQTT 'delivery complete' message to subscriber:" + iMqttDeliveryToken);
        }
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (propertyDescriptor == PROP_MAX_QUEUE_SIZE) {
            int intValue = Integer.valueOf(str2).intValue();
            if (this.mqttQueue != null) {
                int size = this.mqttQueue.size();
                if (size > intValue) {
                    this.logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", new Object[]{Integer.valueOf(intValue), Integer.valueOf(size)});
                    return;
                }
                LinkedBlockingQueue<MQTTQueueMessage> linkedBlockingQueue = new LinkedBlockingQueue<>(intValue);
                this.mqttQueue.drainTo(linkedBlockingQueue);
                this.mqttQueue = linkedBlockingQueue;
            }
        }
    }

    @Override // org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor
    public Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        Collection<ValidationResult> customValidate = super.customValidate(validationContext);
        int intValue = validationContext.getProperty(PROP_MAX_QUEUE_SIZE).asInteger().intValue();
        if (this.mqttQueue == null) {
            this.mqttQueue = new LinkedBlockingQueue<>(validationContext.getProperty(PROP_MAX_QUEUE_SIZE).asInteger().intValue());
        }
        int size = this.mqttQueue.size();
        if (size > intValue) {
            customValidate.add(new ValidationResult.Builder().valid(false).subject("ConsumeMQTT Configuration").explanation(String.format("%s (%d) is smaller than the number of messages pending (%d).", PROP_MAX_QUEUE_SIZE.getDisplayName(), Integer.valueOf(intValue), Integer.valueOf(size))).build());
        }
        return customValidate;
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.logger = getLogger();
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException, ClassNotFoundException {
        this.qos = processContext.getProperty(PROP_QOS).asInteger().intValue();
        this.maxQueueSize = processContext.getProperty(PROP_MAX_QUEUE_SIZE).asLong().longValue();
        this.topicFilter = processContext.getProperty(PROP_TOPIC_FILTER).getValue();
        buildClient(processContext);
        this.scheduled.set(true);
    }

    @OnUnscheduled
    public void onUnscheduled(ProcessContext processContext) {
        this.scheduled.set(false);
        this.mqttClientConnectLock.writeLock().lock();
        try {
            if (isConnected()) {
                this.mqttClient.disconnect(DISCONNECT_TIMEOUT);
                this.logger.info("Disconnected the MQTT client.");
            }
        } catch (MqttException e) {
            this.logger.error("Failed when disconnecting the MQTT client.", e);
        } finally {
            this.mqttClientConnectLock.writeLock().unlock();
        }
    }

    @OnStopped
    public void onStopped(ProcessContext processContext) throws IOException {
        if (this.mqttQueue != null && !this.mqttQueue.isEmpty() && this.processSessionFactory != null) {
            this.logger.info("Finishing processing leftover messages");
            transferQueue(this.processSessionFactory.createSession());
        } else if (this.mqttQueue != null && !this.mqttQueue.isEmpty()) {
            throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
        }
    }

    @Override // org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.mqttQueue.isEmpty() && !isConnected() && this.scheduled.get()) {
            this.logger.info("Queue is empty and client is not connected. Attempting to reconnect.");
            try {
                reconnect();
            } catch (MqttException e) {
                this.logger.error("Connection to " + this.broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e);
                processContext.yield();
            }
        }
        if (this.mqttQueue.isEmpty()) {
            return;
        }
        transferQueue(processSession);
    }

    private void transferQueue(ProcessSession processSession) {
        while (!this.mqttQueue.isEmpty()) {
            FlowFile create = processSession.create();
            final MQTTQueueMessage peek = this.mqttQueue.peek();
            HashMap hashMap = new HashMap();
            hashMap.put(BROKER_ATTRIBUTE_KEY, this.broker);
            hashMap.put(TOPIC_ATTRIBUTE_KEY, peek.getTopic());
            hashMap.put(QOS_ATTRIBUTE_KEY, String.valueOf(peek.getQos()));
            hashMap.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(peek.isDuplicate()));
            hashMap.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(peek.isRetained()));
            FlowFile write = processSession.write(processSession.putAllAttributes(create, hashMap), new OutputStreamCallback() { // from class: org.apache.nifi.processors.mqtt.ConsumeMQTT.1
                public void process(OutputStream outputStream) throws IOException {
                    outputStream.write(peek.getPayload());
                }
            });
            processSession.getProvenanceReporter().receive(write, this.broker + peek.getTopic());
            processSession.transfer(write, REL_MESSAGE);
            processSession.commit();
            if (!this.mqttQueue.remove(peek) && this.logger.isWarnEnabled()) {
                this.logger.warn("FlowFile " + write.getAttribute(CoreAttributes.UUID.key()) + " for Mqtt message " + peek + " had already been removed from queue, possible duplication of flow files");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() throws MqttException {
        this.mqttClientConnectLock.writeLock().lock();
        try {
            if (!this.mqttClient.isConnected()) {
                setAndConnectClient(new ConsumeMQTTCallback());
                this.mqttClient.subscribe(this.topicFilter, this.qos);
            }
        } finally {
            this.mqttClientConnectLock.writeLock().unlock();
        }
    }

    private boolean isConnected() {
        return this.mqttClient != null && this.mqttClient.isConnected();
    }

    static {
        List<PropertyDescriptor> abstractPropertyDescriptors = getAbstractPropertyDescriptors();
        abstractPropertyDescriptors.add(PROP_TOPIC_FILTER);
        abstractPropertyDescriptors.add(PROP_QOS);
        abstractPropertyDescriptors.add(PROP_MAX_QUEUE_SIZE);
        descriptors = Collections.unmodifiableList(abstractPropertyDescriptors);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_MESSAGE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
