package org.apache.nifi.processors.azure.cosmos.document;

import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ConflictException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and schema to read an incoming record set from the body of a Flowfile and then inserts those records into a configured Cosmos DB Container.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"azure", "cosmos", "insert", "record", "put"})
/* loaded from: input_file:org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.class */
public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
    private String conflictHandlingStrategy;
    static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure");
    static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure");
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder().name("insert-batch-size").displayName("Insert Batch Size").description("The number of records to group together for one single insert operation against Cosmos DB").defaultValue("20").required(false).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder().name("azure-cosmos-db-conflict-handling-strategy").displayName("Cosmos DB Conflict Handling Strategy").description("Choose whether to ignore or upsert when conflict error occurs during insertion").required(false).allowableValues(new AllowableValue[]{IGNORE_CONFLICT, UPSERT_CONFLICT}).defaultValue(IGNORE_CONFLICT.getValue()).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    protected void bulkInsert(List<Map<String, Object>> list) throws CosmosException {
        ComponentLog logger = getLogger();
        CosmosContainer container = getContainer();
        for (Map<String, Object> map : list) {
            try {
                container.createItem(map);
            } catch (ConflictException e) {
                if (this.conflictHandlingStrategy != null && this.conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())) {
                    container.upsertItem(map);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("Ignoring duplicate based on selected conflict resolution strategy");
                }
            }
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        String value = processContext.getProperty(PARTITION_KEY).getValue();
        ArrayList arrayList = new ArrayList();
        int intValue = processContext.getProperty(INSERT_BATCH_SIZE).asInteger().intValue();
        boolean z = false;
        try {
            try {
                InputStream read = processSession.read(flowFile);
                try {
                    RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                    try {
                        RecordSchema schema = createRecordReader.getSchema();
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            Map map = (Map) DataTypeUtils.convertRecordFieldtoObject(nextRecord, RecordFieldType.RECORD.getRecordDataType(schema));
                            if (map.containsKey("id")) {
                                Object obj = map.get("id");
                                String valueOf = obj == null ? "" : String.valueOf(obj);
                                if (obj == null || StringUtils.isBlank(valueOf)) {
                                    map.put("id", UUID.randomUUID().toString());
                                } else {
                                    map.put("id", valueOf);
                                }
                            } else {
                                map.put("id", UUID.randomUUID().toString());
                            }
                            if (!map.containsKey(value)) {
                                logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", value));
                                z = true;
                                break;
                            } else {
                                arrayList.add(map);
                                if (arrayList.size() == intValue) {
                                    bulkInsert(arrayList);
                                    arrayList = new ArrayList();
                                }
                            }
                        }
                        if (!z && arrayList.size() > 0) {
                            bulkInsert(arrayList);
                        }
                        if (createRecordReader != null) {
                            createRecordReader.close();
                        }
                        if (read != null) {
                            read.close();
                        }
                        z = z;
                    } catch (Throwable th) {
                        if (createRecordReader != null) {
                            try {
                                createRecordReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (read != null) {
                        try {
                            read.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
                logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage(), e});
                if (1 != 0) {
                    processSession.transfer(flowFile, REL_FAILURE);
                } else {
                    processSession.getProvenanceReporter().send(flowFile, getURI(processContext));
                    processSession.transfer(flowFile, REL_SUCCESS);
                }
            }
        } finally {
            if (0 == 0) {
                processSession.getProvenanceReporter().send(flowFile, getURI(processContext));
                processSession.transfer(flowFile, REL_SUCCESS);
            } else {
                processSession.transfer(flowFile, REL_FAILURE);
            }
        }
    }

    @Override // org.apache.nifi.processors.azure.cosmos.document.AbstractAzureCosmosDBProcessor
    protected void doPostActionOnSchedule(ProcessContext processContext) {
        this.conflictHandlingStrategy = processContext.getProperty(CONFLICT_HANDLE_STRATEGY).getValue();
        if (this.conflictHandlingStrategy == null) {
            this.conflictHandlingStrategy = IGNORE_CONFLICT.getValue();
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(descriptors);
        arrayList.add(RECORD_READER_FACTORY);
        arrayList.add(INSERT_BATCH_SIZE);
        arrayList.add(CONFLICT_HANDLE_STRATEGY);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
