/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.codehaus.jackson.JsonNode;

@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags(value={"elasticsearch", "fetch", "read", "get", "http"})
@CapabilityDescription(value="Retrieves a document from Elasticsearch using the specified connection properties and the identifier of the document to retrieve. Note that the full body of the document will be read into memory before being written to a Flow File for transfer.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The filename attribute is set to the document identifier"), @WritesAttribute(attribute="es.index", description="The Elasticsearch index containing the document"), @WritesAttribute(attribute="es.type", description="The Elasticsearch document type")})
public class FetchElasticsearchHttp
extends AbstractElasticsearchHttpProcessor {
    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming flow files will be routed to failure.").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found").description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("fetch-es-doc-id").displayName("Document Identifier").description("The identifier of the document to be fetched").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("fetch-es-index").displayName("Index").description("The name of the index to read from. If the property is set to _all, the query will match across all indexes.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("fetch-es-type").displayName("Type").description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, the first document matching the identifier across all types will be retrieved.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().name("fetch-es-fields").displayName("Fields").description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, then the entire document's source will be retrieved.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_RETRY);
        relationships.add(REL_NOT_FOUND);
        return Collections.unmodifiableSet(relationships);
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ES_URL);
        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
        descriptors.add(USERNAME);
        descriptors.add(PASSWORD);
        descriptors.add(CONNECT_TIMEOUT);
        descriptors.add(RESPONSE_TIMEOUT);
        descriptors.add(DOC_ID);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(FIELDS);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = null;
        if (context.hasIncomingConnection() && (flowFile = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        OkHttpClient okHttpClient = this.getClient();
        if (flowFile == null) {
            flowFile = session.create();
        }
        String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
        String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() : null;
        String username = context.getProperty(USERNAME).getValue();
        String password = context.getProperty(PASSWORD).getValue();
        ComponentLog logger = this.getLogger();
        try {
            logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
            String urlstr = StringUtils.trimToEmpty((String)context.getProperty(ES_URL).getValue());
            URL url = this.buildRequestURL(urlstr, docId, index, docType, fields);
            long startNanos = System.nanoTime();
            Response getResponse = this.sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
            int statusCode = getResponse.code();
            if (this.isSuccess(statusCode)) {
                ResponseBody body = getResponse.body();
                byte[] bodyBytes = body.bytes();
                JsonNode responseJson = this.parseJsonResponse((InputStream)new ByteArrayInputStream(bodyBytes));
                boolean found = responseJson.get("found").asBoolean(false);
                String retrievedIndex = responseJson.get("_index").asText();
                String retrievedType = responseJson.get("_type").asText();
                String retrievedId = responseJson.get("_id").asText();
                if (found) {
                    JsonNode source = responseJson.get("_source");
                    flowFile = session.putAttribute(flowFile, "filename", retrievedId);
                    flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
                    flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
                    if (source != null) {
                        flowFile = session.write(flowFile, out -> out.write(source.toString().getBytes()));
                    }
                    logger.debug("Elasticsearch document " + retrievedId + " fetched, routing to success");
                    long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                    if (context.hasNonLoopConnection()) {
                        session.getProvenanceReporter().fetch(flowFile, url.toExternalForm(), millis);
                    } else {
                        session.getProvenanceReporter().receive(flowFile, url.toExternalForm(), millis);
                    }
                    session.transfer(flowFile, REL_SUCCESS);
                } else {
                    logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", new Object[]{index, docType, docId});
                    session.transfer(flowFile, REL_NOT_FOUND);
                }
            } else if (statusCode == 404) {
                logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", new Object[]{index, docType, docId});
                session.transfer(flowFile, REL_NOT_FOUND);
            } else if (statusCode / 100 == 5) {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFile, REL_RETRY);
                context.yield();
            } else if (context.hasIncomingConnection()) {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFile, REL_FAILURE);
            } else {
                logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()});
                session.remove(flowFile);
            }
        }
        catch (IOException ioe) {
            logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.). Routing to retry", new Object[]{ioe.getLocalizedMessage()}, (Throwable)ioe);
            if (context.hasIncomingConnection()) {
                session.transfer(flowFile, REL_RETRY);
            } else {
                session.remove(flowFile);
            }
            context.yield();
        }
        catch (Exception e) {
            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, (Throwable)e);
            if (context.hasIncomingConnection()) {
                session.transfer(flowFile, REL_FAILURE);
            } else {
                session.remove(flowFile);
            }
            context.yield();
        }
    }

    private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields) throws MalformedURLException {
        if (StringUtils.isEmpty((CharSequence)baseUrl)) {
            throw new MalformedURLException("Base URL cannot be null");
        }
        HttpUrl.Builder builder = HttpUrl.parse((String)baseUrl).newBuilder();
        builder.addPathSegment(StringUtils.isEmpty((CharSequence)index) ? "_all" : index);
        if (!StringUtils.isEmpty((CharSequence)type)) {
            builder.addPathSegment(type);
        }
        builder.addPathSegment(docId);
        if (!StringUtils.isEmpty((CharSequence)fields)) {
            String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
        }
        return builder.build().url();
    }
}

