/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.util.StringUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ArrayNode;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags(value={"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
@CapabilityDescription(value="Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as the index to insert into and the type of the document.")
public class PutElasticsearchHttp
extends AbstractElasticsearchHttpProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder().name("put-es-id-attr").displayName("Identifier Attribute").description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.").required(false).expressionLanguageSupported(false).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("put-es-index").displayName("Index").description("The name of the index to insert into").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("put-es-type").displayName("Type").description("The type of this document (used by Elasticsearch for indexing and searching)").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).build();
    public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder().name("put-es-index-op").displayName("Index Operation").description("The type of the operation used to index (index, update, upsert, delete)").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).defaultValue("index").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("put-es-batch-size").displayName("Batch Size").description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the same order the flow files were received.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").build();

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_RETRY);
        return Collections.unmodifiableSet(relationships);
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ES_URL);
        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
        descriptors.add(USERNAME);
        descriptors.add(PASSWORD);
        descriptors.add(CONNECT_TIMEOUT);
        descriptors.add(RESPONSE_TIMEOUT);
        descriptors.add(ID_ATTRIBUTE);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(CHARSET);
        descriptors.add(BATCH_SIZE);
        descriptors.add(INDEX_OP);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue();
        String indexOp = validationContext.getProperty(INDEX_OP).getValue();
        if (StringUtils.isEmpty((String)idAttribute)) {
            switch (indexOp.toLowerCase()) {
                case "update": 
                case "upsert": 
                case "delete": 
                case "": {
                    problems.add(new ValidationResult.Builder().valid(false).subject(INDEX_OP.getDisplayName()).explanation("If Identifier Attribute is not set, Index Operation must evaluate to \"index\"").build());
                    break;
                }
            }
        }
        return problems;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        URL url;
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        List flowFiles = session.get(batchSize);
        if (flowFiles.isEmpty()) {
            return;
        }
        String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
        Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
        String username = context.getProperty(USERNAME).getValue();
        String password = context.getProperty(PASSWORD).getValue();
        OkHttpClient okHttpClient = this.getClient();
        ComponentLog logger = this.getLogger();
        LinkedList flowFilesToTransfer = new LinkedList(flowFiles);
        StringBuilder sb = new StringBuilder();
        String baseUrl = org.apache.commons.lang3.StringUtils.trimToEmpty((String)context.getProperty(ES_URL).getValue());
        try {
            url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
        }
        catch (MalformedURLException mue) {
            context.yield();
            throw new ProcessException((Throwable)mue);
        }
        block15: for (FlowFile file2 : flowFiles) {
            String id;
            String index = context.getProperty(INDEX).evaluateAttributeExpressions(file2).getValue();
            if (StringUtils.isEmpty((String)index)) {
                logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file2).getValue();
            String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file2).getValue();
            if (StringUtils.isEmpty((String)indexOp)) {
                logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            switch (indexOp.toLowerCase()) {
                case "index": 
                case "update": 
                case "upsert": 
                case "delete": {
                    break;
                }
                default: {
                    logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file2});
                    flowFilesToTransfer.remove(file2);
                    session.transfer(file2, REL_FAILURE);
                    continue block15;
                }
            }
            String string = id = id_attribute != null ? file2.getAttribute(id_attribute) : null;
            if (id == null && !indexOp.equalsIgnoreCase("index")) {
                logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.", new Object[]{indexOp, file2});
                flowFilesToTransfer.remove(file2);
                session.transfer(file2, REL_FAILURE);
                continue;
            }
            StringBuilder json = new StringBuilder();
            session.read(file2, in -> json.append(IOUtils.toString((InputStream)in, (Charset)charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ')));
            if (indexOp.equalsIgnoreCase("index")) {
                sb.append("{\"index\": { \"_index\": \"");
                sb.append(index);
                sb.append("\", \"_type\": \"");
                sb.append(docType);
                sb.append("\"");
                if (!StringUtils.isEmpty((String)id)) {
                    sb.append(", \"_id\": \"");
                    sb.append(id);
                    sb.append("\"");
                }
                sb.append("}}\n");
                sb.append((CharSequence)json);
                sb.append("\n");
                continue;
            }
            if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) {
                sb.append("{\"update\": { \"_index\": \"");
                sb.append(index);
                sb.append("\", \"_type\": \"");
                sb.append(docType);
                sb.append("\", \"_id\": \"");
                sb.append(id);
                sb.append("\" }\n");
                sb.append("{\"doc\": ");
                sb.append((CharSequence)json);
                sb.append(", \"doc_as_upsert\": ");
                sb.append(indexOp.equalsIgnoreCase("upsert"));
                sb.append(" }\n");
                continue;
            }
            if (!indexOp.equalsIgnoreCase("delete")) continue;
            sb.append("{\"delete\": { \"_index\": \"");
            sb.append(index);
            sb.append("\", \"_type\": \"");
            sb.append(docType);
            sb.append("\", \"_id\": \"");
            sb.append(id);
            sb.append("\" }\n");
        }
        if (!flowFilesToTransfer.isEmpty()) {
            Response getResponse;
            RequestBody requestBody = RequestBody.create((MediaType)MediaType.parse((String)"application/json"), (String)sb.toString());
            try {
                getResponse = this.sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
            }
            catch (Exception e) {
                logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, (Throwable)e);
                flowFilesToTransfer.forEach(flowFileToTransfer -> {
                    flowFileToTransfer = session.penalize(flowFileToTransfer);
                    session.transfer(flowFileToTransfer, REL_FAILURE);
                });
                flowFilesToTransfer.clear();
                return;
            }
            int statusCode = getResponse.code();
            if (this.isSuccess(statusCode)) {
                ResponseBody responseBody = getResponse.body();
                try {
                    ArrayNode itemNodeArray;
                    byte[] bodyBytes = responseBody.bytes();
                    JsonNode responseJson = this.parseJsonResponse((InputStream)new ByteArrayInputStream(bodyBytes));
                    boolean errors = responseJson.get("errors").asBoolean(false);
                    if (errors && (itemNodeArray = (ArrayNode)responseJson.get("items")).size() > 0) {
                        for (int i = itemNodeArray.size() - 1; i >= 0; --i) {
                            JsonNode itemNode = itemNodeArray.get(i);
                            FlowFile flowFile = (FlowFile)flowFilesToTransfer.remove(i);
                            int status = itemNode.findPath("status").asInt();
                            if (!this.isSuccess(status)) {
                                String reason = itemNode.findPath("//error/reason").asText();
                                logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", new Object[]{flowFile, reason});
                                session.transfer(flowFile, REL_FAILURE);
                                continue;
                            }
                            session.transfer(flowFile, REL_SUCCESS);
                            session.getProvenanceReporter().send(flowFile, url.toString());
                        }
                    }
                    flowFilesToTransfer.forEach(file -> {
                        session.transfer(file, REL_SUCCESS);
                        session.getProvenanceReporter().send(file, url.toString());
                    });
                }
                catch (IOException ioe) {
                    logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                    session.transfer(flowFilesToTransfer, REL_FAILURE);
                    context.yield();
                }
            } else if (statusCode / 100 == 5) {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFilesToTransfer, REL_RETRY);
                context.yield();
            } else {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFilesToTransfer, REL_FAILURE);
            }
        }
    }
}

