package org.apache.nifi.processors.mongodb;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.conversions.Bson;

@CapabilityDescription("A processor that runs an aggregation query whenever a flowfile is received.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"mongo", "aggregation", "aggregate"})
/* loaded from: input_file:org/apache/nifi/processors/mongodb/RunMongoAggregation.class */
public class RunMongoAggregation extends AbstractMongoProcessor {
    static final Relationship REL_ORIGINAL = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query succeeds.").name("original").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().description("The input flowfile gets sent to this relationship when the query fails.").name("failure").build();
    static final Relationship REL_RESULTS = new Relationship.Builder().description("The result set of the aggregation will be sent to this relationship.").name("results").build();
    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("mongo-agg-query").displayName("Query").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).description("The aggregation query to be executed.").required(true).addValidator(JsonValidator.INSTANCE).build();
    static final PropertyDescriptor ALLOW_DISK_USE = new PropertyDescriptor.Builder().name("allow-disk-use").displayName("Allow Disk Use").description("Set this to true to enable writing data to temporary files to prevent exceeding the maximum memory use limit during aggregation pipeline staged when handling large datasets.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_RESULTS, REL_ORIGINAL, REL_FAILURE);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(getCommonPropertyDescriptors().stream(), Stream.of((Object[]) new PropertyDescriptor[]{CHARSET, QUERY, ALLOW_DISK_USE, JSON_TYPE, QUERY_ATTRIBUTE, BATCH_SIZE, RESULTS_PER_FLOWFILE, DATE_FORMAT})).toList();

    static final List<Bson> buildAggregationQuery(String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        ObjectMapper objectMapper = new ObjectMapper();
        Iterator it = ((List) objectMapper.readValue(str, List.class)).iterator();
        while (it.hasNext()) {
            arrayList.add(BasicDBObject.parse(objectMapper.writeValueAsString((Map) it.next())));
        }
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    private String buildBatch(List<Document> list) {
        String str;
        try {
            str = this.objectMapper.writeValueAsString(list.size() > 1 ? list : list.getFirst());
        } catch (Exception e) {
            str = null;
        }
        return str;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null && processContext.hasNonLoopConnection()) {
                return;
            }
        }
        String value = processContext.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        Boolean asBoolean = processContext.getProperty(ALLOW_DISK_USE).asBoolean();
        String value2 = processContext.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
        Integer asInteger = processContext.getProperty(BATCH_SIZE).asInteger();
        Integer asInteger2 = processContext.getProperty(RESULTS_PER_FLOWFILE).asInteger();
        configureMapper(processContext.getProperty(JSON_TYPE).getValue(), processContext.getProperty(DATE_FORMAT).evaluateAttributeExpressions(flowFile).getValue());
        HashMap hashMap = new HashMap();
        if (value2 != null && !value2.trim().isEmpty()) {
            hashMap.put(value2, value);
        }
        MongoCursor mongoCursor = null;
        try {
            try {
                AggregateIterable allowDiskUse = getCollection(processContext, flowFile).aggregate(buildAggregationQuery(value)).allowDiskUse(asBoolean);
                allowDiskUse.batchSize(asInteger != null ? asInteger.intValue() : 1);
                mongoCursor = allowDiskUse.iterator();
                ArrayList arrayList = new ArrayList();
                Boolean bool = false;
                while (mongoCursor.hasNext()) {
                    arrayList.add((Document) mongoCursor.next());
                    if (arrayList.size() == asInteger2.intValue()) {
                        writeBatch(buildBatch(arrayList), flowFile, processContext, processSession, hashMap, REL_RESULTS);
                        arrayList = new ArrayList();
                        bool = Boolean.valueOf(bool.booleanValue() | true);
                    }
                }
                if (!arrayList.isEmpty()) {
                    writeBatch(buildBatch(arrayList), flowFile, processContext, processSession, hashMap, REL_RESULTS);
                } else if (!bool.booleanValue()) {
                    writeBatch("", flowFile, processContext, processSession, hashMap, REL_RESULTS);
                }
                if (flowFile != null) {
                    processSession.transfer(flowFile, REL_ORIGINAL);
                }
                if (mongoCursor != null) {
                    mongoCursor.close();
                }
            } catch (Exception e) {
                getLogger().error("Error running MongoDB aggregation query.", e);
                if (flowFile != null) {
                    processSession.transfer(flowFile, REL_FAILURE);
                }
                if (mongoCursor != null) {
                    mongoCursor.close();
                }
            }
        } catch (Throwable th) {
            if (mongoCursor != null) {
                mongoCursor.close();
            }
            throw th;
        }
    }
}
