package org.apache.nifi.processors.mqtt;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttConstants;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single instance of this processor. A high value for this property could represent a lot of data being stored in memory.")
@WritesAttributes({@WritesAttribute(attribute = ConsumeMQTT.RECORD_COUNT_KEY, description = "The number of records received"), @WritesAttribute(attribute = ConsumeMQTT.BROKER_ATTRIBUTE_KEY, description = "MQTT broker that was the message source"), @WritesAttribute(attribute = ConsumeMQTT.TOPIC_ATTRIBUTE_KEY, description = "MQTT topic on which message was received"), @WritesAttribute(attribute = ConsumeMQTT.QOS_ATTRIBUTE_KEY, description = "The quality of service for this message."), @WritesAttribute(attribute = ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY, description = "Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute = ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY, description = "Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published on the topic.")})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
@SeeAlso({PublishMQTT.class})
/* loaded from: input_file:org/apache/nifi/processors/mqtt/ConsumeMQTT.class */
public class ConsumeMQTT extends AbstractMQTTProcessor {
    public static final String RECORD_COUNT_KEY = "record.count";
    public static final String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
    public static final String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
    public static final String QOS_ATTRIBUTE_KEY = "mqtt.qos";
    public static final String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
    public static final String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
    public static final String TOPIC_FIELD_KEY = "_topic";
    public static final String QOS_FIELD_KEY = "_qos";
    public static final String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
    public static final String IS_RETAINED_FIELD_KEY = "_isRetained";
    private static final String COUNTER_PARSE_FAILURES = "Parse Failures";
    private static final String COUNTER_RECORDS_RECEIVED = "Records Received";
    private static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
    private static final int MAX_MESSAGES_PER_FLOW_FILE = 10000;
    private volatile int qos;
    private volatile String topicFilter;
    private volatile LinkedBlockingQueue<ReceivedMqttMessage> mqttQueue;
    public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder().name("Group ID").description("MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder().name("Topic Filter").description("The MQTT topic filter to designate the topics to subscribe to.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder().name("Quality of Service(QoS)").displayName("Quality of Service (QoS)").description("The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.").required(true).defaultValue(MqttConstants.ALLOWABLE_VALUE_QOS_0.getValue()).allowableValues(new AllowableValue[]{MqttConstants.ALLOWABLE_VALUE_QOS_0, MqttConstants.ALLOWABLE_VALUE_QOS_1, MqttConstants.ALLOWABLE_VALUE_QOS_2}).build();
    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder().name("Max Queue Size").description("The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_READER).description("The Record Reader to use for parsing received MQTT Messages into Records.").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_WRITER).description("The Record Writer to use for serializing Records before writing them to a FlowFile.").build();
    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR).description("With this property, you have an option to output FlowFiles which contains multiple messages. This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple messages. This is an optional property ; if not provided, and if not defining a Record Reader/Writer, each message received will result in a single FlowFile. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.").build();
    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder().name("add-attributes-as-fields").displayName("Add attributes as fields").description("If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.").required(true).defaultValue("true").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).dependsOn(RECORD_READER, new AllowableValue[0]).build();
    public static final Relationship REL_MESSAGE = new Relationship.Builder().name("Message").description("The MQTT message output").build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").autoTerminateDefault(true).build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(PROP_BROKER_URI, PROP_MQTT_VERSION, PROP_USERNAME, PROP_PASSWORD, PROP_SSL_CONTEXT_SERVICE, PROP_CLEAN_SESSION, PROP_SESSION_EXPIRY_INTERVAL, PROP_CLIENTID, PROP_GROUPID, PROP_TOPIC_FILTER, PROP_QOS, RECORD_READER, RECORD_WRITER, ADD_ATTRIBUTES_AS_FIELDS, MESSAGE_DEMARCATOR, PROP_CONN_TIMEOUT, PROP_KEEP_ALIVE_INTERVAL, PROP_LAST_WILL_MESSAGE, PROP_LAST_WILL_TOPIC, PROP_LAST_WILL_RETAIN, PROP_LAST_WILL_QOS, PROP_MAX_QUEUE_SIZE));
    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_MESSAGE, REL_PARSE_FAILURE)));
    private volatile String topicPrefix = "";
    private final AtomicBoolean scheduled = new AtomicBoolean(false);

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (propertyDescriptor == PROP_MAX_QUEUE_SIZE) {
            int parseInt = Integer.parseInt(str2);
            if (this.mqttQueue != null) {
                int size = this.mqttQueue.size();
                if (size > parseInt) {
                    this.logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", new Object[]{Integer.valueOf(parseInt), Integer.valueOf(size)});
                    return;
                }
                LinkedBlockingQueue<ReceivedMqttMessage> linkedBlockingQueue = new LinkedBlockingQueue<>(parseInt);
                this.mqttQueue.drainTo(linkedBlockingQueue);
                this.mqttQueue = linkedBlockingQueue;
            }
        }
    }

    @Override // org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor
    public Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        Collection<ValidationResult> customValidate = super.customValidate(validationContext);
        int intValue = validationContext.getProperty(PROP_MAX_QUEUE_SIZE).asInteger().intValue();
        if (this.mqttQueue == null) {
            this.mqttQueue = new LinkedBlockingQueue<>(validationContext.getProperty(PROP_MAX_QUEUE_SIZE).asInteger().intValue());
        }
        int size = this.mqttQueue.size();
        if (size > intValue) {
            customValidate.add(new ValidationResult.Builder().valid(false).subject("ConsumeMQTT Configuration").explanation(String.format("%s (%d) is smaller than the number of messages pending (%d).", PROP_MAX_QUEUE_SIZE.getDisplayName(), Integer.valueOf(intValue), Integer.valueOf(size))).build());
        }
        boolean isSet = validationContext.getProperty(PROP_CLIENTID).isSet();
        boolean isExpressionLanguagePresent = validationContext.getProperty(PROP_CLIENTID).isExpressionLanguagePresent();
        boolean isSet2 = validationContext.getProperty(PROP_GROUPID).isSet();
        if (!isExpressionLanguagePresent && isSet && isSet2) {
            customValidate.add(new ValidationResult.Builder().subject("Client ID and Group ID").valid(false).explanation("if client ID is not unique, multiple nodes cannot join the consumer group (if you want to set the client ID, please use expression language to make sure each node in the NiFi cluster gets a unique client ID with something like ${hostname()}).").build());
        }
        return customValidate;
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.logger = getLogger();
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    @Override // org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.qos = processContext.getProperty(PROP_QOS).asInteger().intValue();
        this.topicFilter = processContext.getProperty(PROP_TOPIC_FILTER).evaluateAttributeExpressions().getValue();
        if (processContext.getProperty(PROP_GROUPID).isSet()) {
            this.topicPrefix = "$share/" + processContext.getProperty(PROP_GROUPID).getValue() + "/";
        } else {
            this.topicPrefix = "";
        }
        this.scheduled.set(true);
    }

    @OnUnscheduled
    public void onUnscheduled(ProcessContext processContext) {
        this.scheduled.set(false);
        synchronized (this) {
            stopClient();
        }
    }

    @OnStopped
    public void onStopped(ProcessContext processContext) {
        if (this.mqttQueue == null || this.mqttQueue.isEmpty() || this.processSessionFactory == null) {
            if (this.mqttQueue != null && !this.mqttQueue.isEmpty()) {
                throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
            }
            return;
        }
        this.logger.info("Finishing processing leftover messages");
        ProcessSession createSession = this.processSessionFactory.createSession();
        if (processContext.getProperty(RECORD_READER).isSet()) {
            transferQueueRecord(processContext, createSession);
        } else if (processContext.getProperty(MESSAGE_DEMARCATOR).isSet()) {
            transferQueueDemarcator(processContext, createSession);
        } else {
            transferQueue(createSession);
        }
    }

    @Override // org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        boolean z = this.scheduled.get();
        if (!isConnected() && z) {
            synchronized (this) {
                if (!isConnected()) {
                    initializeClient(processContext);
                }
            }
        }
        if (this.mqttQueue.isEmpty()) {
            processContext.yield();
            return;
        }
        if (processContext.getProperty(RECORD_READER).isSet()) {
            transferQueueRecord(processContext, processSession);
        } else if (processContext.getProperty(MESSAGE_DEMARCATOR).isSet()) {
            transferQueueDemarcator(processContext, processSession);
        } else {
            transferQueue(processSession);
        }
    }

    private void initializeClient(ProcessContext processContext) {
        try {
            this.mqttClient = createMqttClient();
            this.mqttClient.connect();
            this.mqttClient.subscribe(this.topicPrefix + this.topicFilter, this.qos, this::handleReceivedMessage);
        } catch (Exception e) {
            this.logger.error("Connection failed to {}. Yielding processor", new Object[]{this.clientProperties.getRawBrokerUris(), e});
            this.mqttClient = null;
            processContext.yield();
        }
    }

    private void transferQueue(ProcessSession processSession) {
        while (!this.mqttQueue.isEmpty()) {
            ReceivedMqttMessage peek = this.mqttQueue.peek();
            FlowFile write = processSession.write(createFlowFileAndPopulateAttributes(processSession, peek), outputStream -> {
                outputStream.write(peek.getPayload() == null ? new byte[0] : peek.getPayload());
            });
            processSession.getProvenanceReporter().receive(write, getTransitUri(peek.getTopic()));
            processSession.transfer(write, REL_MESSAGE);
            processSession.commitAsync();
            this.mqttQueue.remove(peek);
        }
    }

    private void transferQueueDemarcator(ProcessContext processContext, ProcessSession processSession) {
        byte[] bytes = processContext.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
        FlowFile create = processSession.create();
        processSession.putAttribute(create, BROKER_ATTRIBUTE_KEY, this.clientProperties.getRawBrokerUris());
        FlowFile append = processSession.append(create, outputStream -> {
            for (int i = 0; !this.mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE; i++) {
                ReceivedMqttMessage poll = this.mqttQueue.poll();
                if (i > 0) {
                    outputStream.write(bytes);
                }
                outputStream.write(poll.getPayload() == null ? new byte[0] : poll.getPayload());
                processSession.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
            }
        });
        processSession.getProvenanceReporter().receive(append, getTransitUri(this.topicPrefix, this.topicFilter));
        processSession.transfer(append, REL_MESSAGE);
        processSession.commitAsync();
    }

    private void transferFailure(ProcessSession processSession, ReceivedMqttMessage receivedMqttMessage) {
        FlowFile write = processSession.write(createFlowFileAndPopulateAttributes(processSession, receivedMqttMessage), outputStream -> {
            outputStream.write(receivedMqttMessage.getPayload());
        });
        processSession.getProvenanceReporter().receive(write, getTransitUri(receivedMqttMessage.getTopic()));
        processSession.transfer(write, REL_PARSE_FAILURE);
        processSession.adjustCounter(COUNTER_PARSE_FAILURES, 1L, false);
    }

    private FlowFile createFlowFileAndPopulateAttributes(ProcessSession processSession, ReceivedMqttMessage receivedMqttMessage) {
        FlowFile create = processSession.create();
        HashMap hashMap = new HashMap();
        hashMap.put(BROKER_ATTRIBUTE_KEY, this.clientProperties.getRawBrokerUris());
        hashMap.put(TOPIC_ATTRIBUTE_KEY, receivedMqttMessage.getTopic());
        hashMap.put(QOS_ATTRIBUTE_KEY, String.valueOf(receivedMqttMessage.getQos()));
        hashMap.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(receivedMqttMessage.isDuplicate()));
        hashMap.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(receivedMqttMessage.isRetained()));
        return processSession.putAllAttributes(create, hashMap);
    }

    private void transferQueueRecord(ProcessContext processContext, ProcessSession processSession) {
        ReceivedMqttMessage poll;
        ByteArrayInputStream byteArrayInputStream;
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory asControllerService2 = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        FlowFile create = processSession.create();
        processSession.putAttribute(create, BROKER_ATTRIBUTE_KEY, this.clientProperties.getRawBrokerUris());
        HashMap hashMap = new HashMap();
        AtomicInteger atomicInteger = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        RecordSetWriter recordSetWriter = null;
        boolean z = false;
        int i = 0;
        while (!this.mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE && (poll = this.mqttQueue.poll()) != null) {
            try {
                try {
                    try {
                        byteArrayInputStream = new ByteArrayInputStream(poll.getPayload() == null ? new byte[0] : poll.getPayload());
                    } catch (Exception e) {
                        this.logger.error("Failed to write message, sending to the parse failure relationship", e);
                        transferFailure(processSession, poll);
                    }
                    try {
                        try {
                            RecordReader createRecordReader = asControllerService.createRecordReader(hashMap, byteArrayInputStream, r21.length, this.logger);
                            while (true) {
                                try {
                                    Record nextRecord = createRecordReader.nextRecord();
                                    if (nextRecord == null) {
                                        break;
                                    }
                                    if (!z) {
                                        RecordSchema schema = nextRecord.getSchema();
                                        OutputStream write = processSession.write(create);
                                        try {
                                            RecordSchema schema2 = asControllerService2.getSchema(create.getAttributes(), schema);
                                            if (processContext.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean().booleanValue()) {
                                                ArrayList arrayList2 = new ArrayList(schema2.getFields());
                                                arrayList2.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
                                                arrayList2.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
                                                arrayList2.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
                                                arrayList2.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
                                                schema2 = new SimpleRecordSchema(arrayList2);
                                            }
                                            recordSetWriter = asControllerService2.createWriter(this.logger, schema2, write, create);
                                            recordSetWriter.beginRecordSet();
                                        } catch (Exception e2) {
                                            this.logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e2);
                                            transferFailure(processSession, poll);
                                        }
                                    }
                                    try {
                                        if (processContext.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean().booleanValue()) {
                                            nextRecord.setValue(TOPIC_FIELD_KEY, poll.getTopic());
                                            nextRecord.setValue(QOS_FIELD_KEY, Integer.valueOf(poll.getQos()));
                                            nextRecord.setValue(IS_RETAINED_FIELD_KEY, Boolean.valueOf(poll.isRetained()));
                                            nextRecord.setValue(IS_DUPLICATE_FIELD_KEY, Boolean.valueOf(poll.isDuplicate()));
                                        }
                                        recordSetWriter.write(nextRecord);
                                        z = true;
                                        arrayList.add(poll);
                                        processSession.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
                                        i++;
                                    } catch (RuntimeException e3) {
                                        this.logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", e3);
                                        transferFailure(processSession, poll);
                                    }
                                } catch (IOException | MalformedRecordException | SchemaValidationException e4) {
                                    this.logger.error("Failed to write message, sending to the parse failure relationship", e4);
                                    transferFailure(processSession, poll);
                                }
                            }
                            byteArrayInputStream.close();
                        } catch (Throwable th) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                            break;
                        }
                    } catch (Exception e5) {
                        this.logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e5);
                        transferFailure(processSession, poll);
                        byteArrayInputStream.close();
                    }
                } finally {
                    closeWriter(recordSetWriter);
                }
            } catch (Exception e6) {
                processContext.yield();
                int i2 = 0;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    try {
                        this.mqttQueue.offer((ReceivedMqttMessage) it.next(), 1L, TimeUnit.SECONDS);
                    } catch (InterruptedException e7) {
                        i2++;
                        if (getLogger().isDebugEnabled()) {
                            this.logger.debug("Could not add message back into the internal queue, this could lead to data loss", e7);
                        }
                    }
                }
                if (i2 > 0) {
                    this.logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[]{Integer.valueOf(i2)});
                }
                throw new ProcessException("Could not process data received from the MQTT broker(s): " + this.clientProperties.getRawBrokerUris(), e6);
            }
        }
        if (recordSetWriter != null) {
            WriteResult finishRecordSet = recordSetWriter.finishRecordSet();
            hashMap.put(RECORD_COUNT_KEY, String.valueOf(finishRecordSet.getRecordCount()));
            hashMap.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
            hashMap.putAll(finishRecordSet.getAttributes());
            atomicInteger.set(finishRecordSet.getRecordCount());
        }
        if (atomicInteger.get() == 0) {
            processSession.remove(create);
            return;
        }
        processSession.putAllAttributes(create, hashMap);
        processSession.getProvenanceReporter().receive(create, getTransitUri(this.topicPrefix, this.topicFilter));
        processSession.transfer(create, REL_MESSAGE);
        int i3 = atomicInteger.get();
        processSession.adjustCounter(COUNTER_RECORDS_PROCESSED, i3, false);
        this.logger.info("Successfully processed {} records for {}", new Object[]{Integer.valueOf(i3), create});
    }

    private void closeWriter(RecordSetWriter recordSetWriter) {
        if (recordSetWriter != null) {
            try {
                recordSetWriter.close();
            } catch (Exception e) {
                this.logger.warn("Failed to close Record Writer", e);
            }
        }
    }

    private String getTransitUri(String... strArr) {
        StringBuilder append = new StringBuilder(this.clientProperties.getProvenanceFormattedBrokerUris()).append("/");
        for (String str : strArr) {
            append.append(str);
        }
        return append.toString();
    }

    private void handleReceivedMessage(ReceivedMqttMessage receivedMqttMessage) {
        if (this.logger.isDebugEnabled()) {
            byte[] payload = receivedMqttMessage.getPayload();
            String str = new String(payload, StandardCharsets.UTF_8);
            if (StringUtils.isAsciiPrintable(str)) {
                this.logger.debug("Message arrived from topic {}. Payload: {}", new Object[]{receivedMqttMessage.getTopic(), str});
            } else {
                this.logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[]{receivedMqttMessage.getTopic(), Integer.valueOf(payload.length)});
            }
        }
        try {
            if (this.mqttQueue.offer(receivedMqttMessage, 1L, TimeUnit.SECONDS)) {
            } else {
                throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
            }
        } catch (InterruptedException e) {
            throw new MqttException("Failed to process message arrived from topic " + receivedMqttMessage.getTopic());
        }
    }
}
