package org.apache.nifi.processors.mongodb;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordSchema;
import org.bson.Document;
import org.bson.types.ObjectId;

@CapabilityDescription("A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"mongo", "mongodb", "get", "fetch", "record", "json"})
@WritesAttributes({@WritesAttribute(attribute = AbstractMongoQueryProcessor.DB_NAME, description = "The database where the results came from."), @WritesAttribute(attribute = AbstractMongoQueryProcessor.COL_NAME, description = "The collection where the results came from.")})
/* loaded from: input_file:org/apache/nifi/processors/mongodb/GetMongoRecord.class */
public class GetMongoRecord extends AbstractMongoQueryProcessor {
    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder().name("get-mongo-record-writer-factory").displayName("Record Writer").description("The record writer to use to write the result sets.").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder().name("mongodb-schema-name").displayName("Schema Name").description("The name of the schema in the configured schema registry to use for the query results.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_BLANK_VALIDATOR).defaultValue("${schema.name}").required(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of((Object[]) new PropertyDescriptor[]{CLIENT_SERVICE, WRITER_FACTORY, DATABASE_NAME, COLLECTION_NAME, SCHEMA_NAME, QUERY_ATTRIBUTE, QUERY, PROJECTION, SORT, LIMIT, BATCH_SIZE});
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL);
    private volatile MongoDBClientService clientService;
    private volatile RecordSetWriterFactory writerFactory;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext processContext) {
        this.clientService = processContext.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
        this.writerFactory = processContext.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null && processContext.hasNonLoopConnection()) {
                return;
            }
        }
        String value = processContext.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
        Document query = getQuery(processContext, processSession, flowFile);
        MongoCollection collection = this.clientService.getDatabase(value).getCollection(value2);
        FindIterable find = collection.find(query);
        if (processContext.getProperty(SORT).isSet()) {
            find = find.sort(Document.parse(processContext.getProperty(SORT).evaluateAttributeExpressions(flowFile).getValue()));
        }
        if (processContext.getProperty(PROJECTION).isSet()) {
            find = find.projection(Document.parse(processContext.getProperty(PROJECTION).evaluateAttributeExpressions(flowFile).getValue()));
        }
        if (processContext.getProperty(LIMIT).isSet()) {
            find = find.limit(processContext.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
        }
        MongoCursor it = find.iterator();
        FlowFile create = flowFile != null ? processSession.create(flowFile) : processSession.create();
        FlowFile flowFile2 = flowFile;
        try {
            Map<String, String> attributes = getAttributes(processContext, flowFile, query, collection);
            try {
                OutputStream write = processSession.write(create);
                try {
                    Map attributes2 = flowFile2 != null ? flowFile2.getAttributes() : Map.of("schema.name", value3);
                    RecordSchema schema = this.writerFactory.getSchema(attributes2, (RecordSchema) null);
                    RecordSetWriter createWriter = this.writerFactory.createWriter(getLogger(), schema, write, attributes2);
                    long j = 0;
                    createWriter.beginRecordSet();
                    while (it.hasNext()) {
                        Document document = (Document) it.next();
                        if (document.get("_id") instanceof ObjectId) {
                            document.put("_id", document.get("_id").toString());
                        }
                        createWriter.write(new MapRecord(schema, document));
                        j++;
                    }
                    createWriter.finishRecordSet();
                    createWriter.close();
                    write.close();
                    attributes.put("record.count", String.valueOf(j));
                    if (write != null) {
                        write.close();
                    }
                    FlowFile putAllAttributes = processSession.putAllAttributes(create, attributes);
                    processSession.getProvenanceReporter().fetch(putAllAttributes, getURI(processContext));
                    processSession.transfer(putAllAttributes, REL_SUCCESS);
                    if (flowFile != null) {
                        processSession.transfer(flowFile, REL_ORIGINAL);
                    }
                } catch (Throwable th) {
                    if (write != null) {
                        try {
                            write.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (SchemaNotFoundException e) {
                throw new RuntimeException((Throwable) e);
            }
        } catch (Exception e2) {
            getLogger().error("Error writing record set from Mongo query.", e2);
            processSession.remove(create);
            if (flowFile != null) {
                processSession.transfer(flowFile, REL_FAILURE);
            }
        }
    }
}
