/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor;
import org.apache.nifi.processors.elasticsearch.RetryableException;
import org.apache.nifi.processors.elasticsearch.UnretryableException;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.codehaus.jackson.JsonNode;

@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags(value={"elasticsearch", "query", "read", "get", "http"})
@CapabilityDescription(value="Queries Elasticsearch using the specified connection properties. Note that the full body of each page of documents will be read into memory before being written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index setting is the upper bound on the number of records that can be retrieved using this query.  To retrieve more records, use the ScrollElasticsearchHttp processor.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The filename attribute is set to the document identifier"), @WritesAttribute(attribute="es.id", description="The Elasticsearch document identifier"), @WritesAttribute(attribute="es.index", description="The Elasticsearch index containing the document"), @WritesAttribute(attribute="es.type", description="The Elasticsearch document type"), @WritesAttribute(attribute="es.result.*", description="If Target is 'Flow file attributes', the JSON attributes of each result will be placed into corresponding attributes with this prefix.")})
public class QueryElasticsearchHttp
extends AbstractElasticsearchHttpProcessor {
    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
    private static final String QUERY_QUERY_PARAM = "q";
    private static final String SORT_QUERY_PARAM = "sort";
    private static final String FROM_QUERY_PARAM = "from";
    private static final String SIZE_QUERY_PARAM = "size";
    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    private static final String ATTRIBUTE_PREFIX = "es.result.";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming flow files will be routed to failure.").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("query-es-query").displayName("Query").description("The Lucene-style query to run against ElasticSearch (e.g., genre:blues AND -artist:muddy)").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("query-es-index").displayName("Index").description("The name of the index to read from. If the property is set to _all, the query will match across all indexes.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("query-es-type").displayName("Type").description("The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, the the query will match across all types.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().name("query-es-fields").displayName("Fields").description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, then the entire document's source will be retrieved.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder().name("query-es-sort").displayName("Sort").description("A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, then the results will be retrieved in document order.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder().name("query-es-size").displayName("Page Size").defaultValue("20").description("Determines how many documents to return per page during scrolling.").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder().name("query-es-limit").displayName("Limit").description("If set, limits the number of results that will be returned.").required(false).expressionLanguageSupported(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder().name("query-es-target").displayName("Target").description("Indicates where the results should be placed.  In the case of 'Flow file content', the JSON response will be written as the content of the flow file.  In the case of 'Flow file attributes', the original flow file (if applicable) will be cloned for each result, and all return fields will be placed in a flow file attribute of the same name, but prefixed by 'es.result.'").required(true).expressionLanguageSupported(false).defaultValue("Flow file content").allowableValues(new String[]{"Flow file content", "Flow file attributes"}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_RETRY);
        return Collections.unmodifiableSet(relationships);
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ES_URL);
        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
        descriptors.add(USERNAME);
        descriptors.add(PASSWORD);
        descriptors.add(CONNECT_TIMEOUT);
        descriptors.add(RESPONSE_TIMEOUT);
        descriptors.add(QUERY);
        descriptors.add(PAGE_SIZE);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(FIELDS);
        descriptors.add(SORT);
        descriptors.add(LIMIT);
        descriptors.add(TARGET);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = null;
        if (context.hasIncomingConnection() && (flowFile = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        OkHttpClient okHttpClient = this.getClient();
        String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
        Integer limit = context.getProperty(LIMIT).isSet() ? Integer.valueOf(context.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger()) : null;
        String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() : null;
        String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT).evaluateAttributeExpressions(flowFile).getValue() : null;
        boolean targetIsContent = context.getProperty(TARGET).getValue().equals(TARGET_FLOW_FILE_CONTENT);
        String username = context.getProperty(USERNAME).getValue();
        String password = context.getProperty(PASSWORD).getValue();
        ComponentLog logger = this.getLogger();
        int fromIndex = 0;
        try {
            int numResults;
            logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[]{index, docType, query});
            long startNanos = System.nanoTime();
            String urlstr = StringUtils.trimToEmpty((String)context.getProperty(ES_URL).getValue());
            boolean hitLimit = false;
            do {
                int mPageSize = pageSize;
                if (limit != null && limit <= fromIndex + pageSize) {
                    mPageSize = limit - fromIndex;
                    hitLimit = true;
                }
                URL queryUrl = this.buildRequestURL(urlstr, query, index, docType, fields, sort, mPageSize, fromIndex);
                Response getResponse = this.sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null);
                numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos, targetIsContent);
                fromIndex += pageSize;
            } while (numResults > 0 && !hitLimit);
            if (flowFile != null) {
                session.remove(flowFile);
            }
        }
        catch (IOException ioe) {
            logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.). Routing to retry", new Object[]{ioe.getLocalizedMessage()}, (Throwable)ioe);
            if (flowFile != null) {
                session.transfer(flowFile, REL_RETRY);
            }
            context.yield();
        }
        catch (RetryableException e) {
            logger.error(e.getMessage(), new Object[]{e.getLocalizedMessage()}, (Throwable)e);
            if (flowFile != null) {
                session.transfer(flowFile, REL_RETRY);
            }
            context.yield();
        }
        catch (Exception e) {
            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, (Throwable)e);
            if (flowFile != null) {
                session.transfer(flowFile, REL_FAILURE);
            }
            context.yield();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getPage(Response getResponse, URL url, ProcessContext context, ProcessSession session, FlowFile flowFile, ComponentLog logger, long startNanos, boolean targetIsContent) throws IOException {
        ArrayList<FlowFile> page = new ArrayList<FlowFile>();
        int statusCode = getResponse.code();
        if (this.isSuccess(statusCode)) {
            ResponseBody body = getResponse.body();
            byte[] bodyBytes = body.bytes();
            JsonNode responseJson = this.parseJsonResponse((InputStream)new ByteArrayInputStream(bodyBytes));
            JsonNode hits = responseJson.get("hits").get("hits");
            for (int i = 0; i < hits.size(); ++i) {
                JsonNode hit = hits.get(i);
                String retrievedId = hit.get("_id").asText();
                String retrievedIndex = hit.get("_index").asText();
                String retrievedType = hit.get("_type").asText();
                FlowFile documentFlowFile = null;
                documentFlowFile = flowFile != null ? (targetIsContent ? session.create(flowFile) : session.clone(flowFile)) : session.create();
                JsonNode source = hit.get("_source");
                documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId);
                documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
                documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
                if (targetIsContent) {
                    documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
                    documentFlowFile = session.putAttribute(documentFlowFile, "mime.type", "application/json");
                    documentFlowFile = session.write(documentFlowFile, out -> out.write(source.toString().getBytes()));
                } else {
                    HashMap<String, String> attributes = new HashMap<String, String>();
                    Iterator it = source.getFields();
                    while (it.hasNext()) {
                        Map.Entry entry = (Map.Entry)it.next();
                        attributes.put(ATTRIBUTE_PREFIX + (String)entry.getKey(), ((JsonNode)entry.getValue()).asText());
                    }
                    documentFlowFile = session.putAllAttributes(documentFlowFile, attributes);
                }
                page.add(documentFlowFile);
            }
            logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
            session.transfer(page, REL_SUCCESS);
        } else {
            try {
                if (statusCode / 100 == 5) {
                    throw new RetryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to retry. This is likely a server problem, yielding...", statusCode, getResponse.message()));
                }
                if (context.hasIncomingConnection()) {
                    throw new UnretryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to failure", statusCode, getResponse.message()));
                }
                logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()});
            }
            finally {
                if (!page.isEmpty()) {
                    session.remove(page);
                    page.clear();
                }
            }
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        if (!page.isEmpty()) {
            if (context.hasNonLoopConnection()) {
                page.forEach(f -> session.getProvenanceReporter().fetch(f, url.toExternalForm(), millis));
            } else {
                page.forEach(f -> session.getProvenanceReporter().receive(f, url.toExternalForm(), millis));
            }
        }
        return page.size();
    }

    private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, String sort, int pageSize, int fromIndex) throws MalformedURLException {
        String trimmedFields;
        if (StringUtils.isEmpty((CharSequence)baseUrl)) {
            throw new MalformedURLException("Base URL cannot be null");
        }
        HttpUrl.Builder builder = HttpUrl.parse((String)baseUrl).newBuilder();
        builder.addPathSegment(StringUtils.isEmpty((CharSequence)index) ? "_all" : index);
        if (!StringUtils.isEmpty((CharSequence)type)) {
            builder.addPathSegment(type);
        }
        builder.addPathSegment("_search");
        builder.addQueryParameter(QUERY_QUERY_PARAM, query);
        builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
        builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
        if (!StringUtils.isEmpty((CharSequence)fields)) {
            trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
        }
        if (!StringUtils.isEmpty((CharSequence)sort)) {
            trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
            builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
        }
        return builder.build().url();
    }
}

