/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor;
import org.apache.nifi.util.StringUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags(value={"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
@CapabilityDescription(value="Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as the index to insert into and the type of the document.")
@DynamicProperty(name="A URL query parameter", value="The value to set it to", expressionLanguageScope=ExpressionLanguageScope.VARIABLE_REGISTRY, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutElasticsearchHttp
extends AbstractElasticsearchHttpProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder().name("put-es-id-attr").displayName("Identifier Attribute").description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("put-es-index").displayName("Index").description("The name of the index to insert into").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("put-es-type").displayName("Type").description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). This must be unset or '_doc' for Elasticsearch 7.0+.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder().name("put-es-index-op").displayName("Index Operation").description("The type of the operation used to index (create, index, update, upsert, delete)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).defaultValue("index").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("put-es-batch-size").displayName("Batch Size").description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the same order the flow files were received.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final ObjectMapper mapper;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue();
        String indexOp = validationContext.getProperty(INDEX_OP).getValue();
        if (StringUtils.isEmpty((String)idAttribute)) {
            switch (indexOp.toLowerCase()) {
                case "update": 
                case "upsert": 
                case "delete": 
                case "": {
                    problems.add(new ValidationResult.Builder().valid(false).subject(INDEX_OP.getDisplayName()).explanation("If Identifier Attribute is not set, Index Operation must evaluate to one of \"index\" or \"create\"").build());
                    break;
                }
            }
        }
        return problems;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        List flowFiles = session.get(batchSize);
        if (flowFiles.isEmpty()) {
            return;
        }
        String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
        OkHttpClient okHttpClient = this.getClient();
        ComponentLog logger = this.getLogger();
        LinkedList flowFilesToTransfer = new LinkedList(flowFiles);
        StringBuilder sb = new StringBuilder();
        String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim();
        if (StringUtils.isEmpty((String)baseUrl)) {
            throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
        }
        HttpUrl.Builder urlBuilder = HttpUrl.parse((String)baseUrl).newBuilder().addPathSegment("_bulk");
        for (Map.Entry property : context.getProperties().entrySet()) {
            PropertyDescriptor pd = (PropertyDescriptor)property.getKey();
            if (!pd.isDynamic() || property.getValue() == null) continue;
            urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
        }
        URL url = urlBuilder.build().url();
        block17: for (FlowFile file2 : flowFiles) {
            String id;
            String index = context.getProperty(INDEX).evaluateAttributeExpressions(file2).getValue();
            Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file2).getValue());
            if (StringUtils.isEmpty((String)index)) {
                logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file2).getValue();
            String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file2).getValue();
            if (StringUtils.isEmpty((String)indexOp)) {
                logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            switch (indexOp.toLowerCase()) {
                case "create": 
                case "index": 
                case "update": 
                case "upsert": 
                case "delete": {
                    break;
                }
                default: {
                    logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file2});
                    flowFilesToTransfer.remove(file2);
                    session.transfer(file2, REL_FAILURE);
                    continue block17;
                }
            }
            String string = id = id_attribute != null ? file2.getAttribute(id_attribute) : null;
            if (id == null && !indexOp.equalsIgnoreCase("index") && !indexOp.equalsIgnoreCase("create")) {
                logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.", new Object[]{indexOp, file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            StringBuilder json = new StringBuilder();
            session.read(file2, in -> json.append(IOUtils.toString((InputStream)in, (Charset)charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ')));
            String jsonString = json.toString();
            try {
                mapper.readTree(jsonString);
            }
            catch (IOException e) {
                logger.error("Flow file content is not valid JSON, penalizing and transferring to failure.", new Object[]{indexOp, file2});
                flowFilesToTransfer.remove(file2);
                file2 = session.penalize(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            this.buildBulkCommand(sb, index, docType, indexOp, id, jsonString);
        }
        if (!flowFilesToTransfer.isEmpty()) {
            Response getResponse;
            RequestBody requestBody = RequestBody.create((MediaType)MediaType.parse((String)"application/json"), (String)sb.toString());
            try {
                getResponse = this.sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
            }
            catch (Exception e) {
                logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, (Throwable)e);
                flowFilesToTransfer.forEach(flowFileToTransfer -> {
                    flowFileToTransfer = session.penalize(flowFileToTransfer);
                    session.transfer(flowFileToTransfer, REL_FAILURE);
                });
                flowFilesToTransfer.clear();
                return;
            }
            int statusCode = getResponse.code();
            if (this.isSuccess(statusCode)) {
                ResponseBody responseBody = getResponse.body();
                try {
                    ArrayNode itemNodeArray;
                    byte[] bodyBytes = responseBody.bytes();
                    JsonNode responseJson = this.parseJsonResponse(new ByteArrayInputStream(bodyBytes));
                    boolean errors = responseJson.get("errors").asBoolean(false);
                    if (errors && (itemNodeArray = (ArrayNode)responseJson.get("items")).size() > 0) {
                        String errorReason = null;
                        for (int i = itemNodeArray.size() - 1; i >= 0; --i) {
                            JsonNode itemNode = itemNodeArray.get(i);
                            if (flowFilesToTransfer.size() <= i) continue;
                            FlowFile flowFile = (FlowFile)flowFilesToTransfer.remove(i);
                            int status = itemNode.findPath("status").asInt();
                            if (!this.isSuccess(status)) {
                                if (errorReason == null) {
                                    String reason = itemNode.findPath("result").asText();
                                    if (StringUtils.isEmpty((String)reason)) {
                                        reason = itemNode.findPath("reason").asText();
                                    }
                                    errorReason = reason;
                                    logger.error("Failed to process {} due to {}, transferring to failure", new Object[]{flowFile, errorReason});
                                }
                                flowFile = session.penalize(flowFile);
                                flowFile = session.putAttribute(flowFile, "reason", errorReason);
                                session.transfer(flowFile, REL_FAILURE);
                                continue;
                            }
                            session.transfer(flowFile, REL_SUCCESS);
                            session.getProvenanceReporter().send(flowFile, url.toString());
                        }
                    }
                    flowFilesToTransfer.forEach(file -> {
                        session.transfer(file, REL_SUCCESS);
                        session.getProvenanceReporter().send(file, url.toString());
                    });
                }
                catch (IOException ioe) {
                    logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                    session.transfer(flowFilesToTransfer, REL_FAILURE);
                    context.yield();
                }
            } else if (statusCode / 100 == 5) {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFilesToTransfer, REL_RETRY);
                context.yield();
            } else {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFilesToTransfer, REL_FAILURE);
            }
            getResponse.close();
        }
    }

    static {
        mapper = new ObjectMapper();
        HashSet<Relationship> _rels = new HashSet<Relationship>();
        _rels.add(REL_SUCCESS);
        _rels.add(REL_FAILURE);
        _rels.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(COMMON_PROPERTY_DESCRIPTORS);
        descriptors.add(ID_ATTRIBUTE);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(BATCH_SIZE);
        descriptors.add(INDEX_OP);
        propertyDescriptors = Collections.unmodifiableList(descriptors);
    }
}

