package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. This processor is intended to be run on the primary node, and is designed for scrolling through huge result sets, as in the case of a reindex.  The state must be cleared before another query can be run.  Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  Note that the full body of each page of documents will be read into memory before being written to a Flow File for transfer.")
@DynamicProperty(name = "A URL query parameter", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")})
@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call.  Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = {Scope.LOCAL})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@EventDriven
@Tags({"elasticsearch", "query", ScrollElasticsearchHttp.SCROLL_QUERY_PARAM, "read", "get", "http"})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.class */
public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    private static final String FINISHED_QUERY_STATE = "finishedQuery";
    private static final String SCROLL_ID_STATE = "scrollId";
    private static final String SCROLL_QUERY_PARAM = "scroll";
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming flow files will be routed to failure.").build();
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("scroll-es-query").displayName("Query").description("The Lucene-style query to run against ElasticSearch (e.g., genre:blues AND -artist:muddy)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder().name("scroll-es-scroll").displayName("Scroll Duration").description("The scroll duration is how long each search context is kept in memory.").defaultValue("1m").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)"))).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("scroll-es-index").displayName("Index").description("The name of the index to read from. If the property is set to _all, the query will match across all indexes.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("scroll-es-type").displayName("Type").description("The type of document (if unset, the query will be against all types in the _index). This should be unset or '_doc' for Elasticsearch 7.0+.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().name("scroll-es-fields").displayName("Fields").description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, then the entire document's source will be retrieved.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder().name("scroll-es-sort").displayName("Sort").description("A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, then the results will be retrieved in document order.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder().name("scroll-es-size").displayName("Page Size").defaultValue("20").description("Determines how many documents to return per page during scrolling.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractElasticsearchProcessor
    @OnScheduled
    public void setup(ProcessContext processContext) {
        super.setup(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        try {
            if (isQueryFinished(processSession)) {
                getLogger().trace("Query has been marked finished in the state manager.  To run another query, clear the state.");
                return;
            }
            OkHttpClient client = getClient();
            FlowFile create = processSession.create();
            String value = processContext.getProperty(INDEX).evaluateAttributeExpressions(create).getValue();
            String value2 = processContext.getProperty(QUERY).evaluateAttributeExpressions(create).getValue();
            String value3 = processContext.getProperty(TYPE).evaluateAttributeExpressions(create).getValue();
            int intValue = processContext.getProperty(PAGE_SIZE).evaluateAttributeExpressions(create).asInteger().intValue();
            String value4 = processContext.getProperty(FIELDS).isSet() ? processContext.getProperty(FIELDS).evaluateAttributeExpressions(create).getValue() : null;
            String value5 = processContext.getProperty(SORT).isSet() ? processContext.getProperty(SORT).evaluateAttributeExpressions(create).getValue() : null;
            String value6 = processContext.getProperty(SCROLL_DURATION).isSet() ? processContext.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(create).getValue() : null;
            String value7 = processContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
            String value8 = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
            Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(create).getValue());
            ComponentLog logger = getLogger();
            try {
                String loadScrollId = loadScrollId(processSession);
                String trimToEmpty = StringUtils.trimToEmpty(processContext.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
                if (loadScrollId != null) {
                    URL buildRequestURL = buildRequestURL(trimToEmpty, value2, value, value3, value4, value5, loadScrollId, intValue, value6, processContext);
                    long nanoTime = System.nanoTime();
                    Response sendRequestToElasticsearch = sendRequestToElasticsearch(client, buildRequestURL, value7, value8, "POST", RequestBody.create(MediaType.parse("application/json"), String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", value6, loadScrollId)));
                    getPage(sendRequestToElasticsearch, buildRequestURL, processContext, processSession, create, logger, nanoTime, forName);
                    sendRequestToElasticsearch.close();
                } else {
                    logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[]{value, value3, value2});
                    URL buildRequestURL2 = buildRequestURL(trimToEmpty, value2, value, value3, value4, value5, loadScrollId, intValue, value6, processContext);
                    long nanoTime2 = System.nanoTime();
                    Response sendRequestToElasticsearch2 = sendRequestToElasticsearch(client, buildRequestURL2, value7, value8, "GET", null);
                    getPage(sendRequestToElasticsearch2, buildRequestURL2, processContext, processSession, create, logger, nanoTime2, forName);
                    sendRequestToElasticsearch2.close();
                }
            } catch (IOException e) {
                logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.).", new Object[]{e.getLocalizedMessage()}, e);
                processSession.remove(create);
                processContext.yield();
            } catch (Exception e2) {
                logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{create, e2.getLocalizedMessage()}, e2);
                processSession.transfer(create, REL_FAILURE);
                processContext.yield();
            }
        } catch (IOException e3) {
            throw new ProcessException("Could not retrieve state", e3);
        }
    }

    private void getPage(Response response, URL url, ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, ComponentLog componentLog, long j, Charset charset) throws IOException {
        int code = response.code();
        if (!isSuccess(code)) {
            if (code / 100 != 5) {
                componentLog.warn("Elasticsearch returned code {} with message {}", new Object[]{Integer.valueOf(code), response.message()});
                processSession.remove(flowFile);
                return;
            } else {
                componentLog.warn("Elasticsearch returned code {} with message {}, removing the flow file. This is likely a server problem, yielding...", new Object[]{Integer.valueOf(code), response.message()});
                processSession.remove(flowFile);
                processContext.yield();
                return;
            }
        }
        JsonNode parseJsonResponse = parseJsonResponse(new ByteArrayInputStream(response.body().bytes()));
        String asText = parseJsonResponse.get("_scroll_id").asText();
        StringBuilder sb = new StringBuilder();
        sb.append("{ \"hits\" : [");
        JsonNode jsonNode = parseJsonResponse.get("hits").get("hits");
        if (jsonNode.size() == 0) {
            finishQuery(processContext.getStateManager());
            processSession.remove(flowFile);
            return;
        }
        for (int i = 0; i < jsonNode.size(); i++) {
            JsonNode jsonNode2 = jsonNode.get(i);
            String asText2 = jsonNode2.get("_index").asText();
            String asText3 = jsonNode2.get("_type").asText();
            JsonNode jsonNode3 = jsonNode2.get("_source");
            flowFile = processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(flowFile, "es.index", asText2), "es.type", asText3), "mime.type", "application/json");
            sb.append(jsonNode3.toString());
            if (i < jsonNode.size() - 1) {
                sb.append(", ");
            }
        }
        sb.append("] }");
        componentLog.debug("Elasticsearch retrieved " + parseJsonResponse.size() + " documents, routing to success");
        FlowFile write = processSession.write(flowFile, outputStream -> {
            outputStream.write(sb.toString().getBytes(charset));
        });
        processSession.transfer(write, REL_SUCCESS);
        processSession.setState(Collections.singletonMap(SCROLL_ID_STATE, asText), Scope.LOCAL);
        processSession.getProvenanceReporter().receive(write, url.toExternalForm(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j));
    }

    private boolean isQueryFinished(ProcessSession processSession) throws IOException {
        StateMap state = processSession.getState(Scope.LOCAL);
        if (state.getVersion() < 0) {
            getLogger().debug("No previous state found");
            return false;
        }
        String str = state.get(FINISHED_QUERY_STATE);
        getLogger().debug("Loaded state with finishedQuery = {}", new Object[]{str});
        return "true".equals(str);
    }

    private String loadScrollId(ProcessSession processSession) throws IOException {
        StateMap state = processSession.getState(Scope.LOCAL);
        if (state.getVersion() < 0) {
            getLogger().debug("No previous state found");
            return null;
        }
        String str = state.get(SCROLL_ID_STATE);
        getLogger().debug("Loaded state with scrollId {}", new Object[]{str});
        return str;
    }

    private void finishQuery(StateManager stateManager) throws IOException {
        HashMap hashMap = new HashMap(2);
        hashMap.put(FINISHED_QUERY_STATE, "true");
        getLogger().debug("Saving state with finishedQuery = true");
        stateManager.setState(hashMap, Scope.LOCAL);
    }

    private URL buildRequestURL(String str, String str2, String str3, String str4, String str5, String str6, String str7, int i, String str8, ProcessContext processContext) throws MalformedURLException {
        if (StringUtils.isEmpty(str)) {
            throw new MalformedURLException("Base URL cannot be null");
        }
        HttpUrl.Builder newBuilder = HttpUrl.parse(str).newBuilder();
        if (StringUtils.isEmpty(str7)) {
            newBuilder.addPathSegment(StringUtils.isEmpty(str3) ? "_all" : str3);
            if (StringUtils.isNotBlank(str4)) {
                newBuilder.addPathSegment(str4);
            }
            newBuilder.addPathSegment("_search");
            newBuilder.addQueryParameter("q", str2);
            newBuilder.addQueryParameter("size", String.valueOf(i));
            if (!StringUtils.isEmpty(str5)) {
                newBuilder.addQueryParameter("_source", (String) Stream.of((Object[]) str5.split(",")).map((v0) -> {
                    return v0.trim();
                }).collect(Collectors.joining(",")));
            }
            if (!StringUtils.isEmpty(str6)) {
                newBuilder.addQueryParameter("sort", (String) Stream.of((Object[]) str6.split(",")).map((v0) -> {
                    return v0.trim();
                }).collect(Collectors.joining(",")));
            }
            newBuilder.addQueryParameter(SCROLL_QUERY_PARAM, str8);
        } else {
            newBuilder.addPathSegment("_search");
            newBuilder.addPathSegment(SCROLL_QUERY_PARAM);
        }
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic() && entry.getValue() != null) {
                newBuilder.addQueryParameter(propertyDescriptor.getName(), processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue());
            }
        }
        return newBuilder.build().url();
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
        ArrayList arrayList = new ArrayList(COMMON_PROPERTY_DESCRIPTORS);
        arrayList.add(QUERY);
        arrayList.add(SCROLL_DURATION);
        arrayList.add(PAGE_SIZE);
        arrayList.add(INDEX);
        arrayList.add(TYPE);
        arrayList.add(FIELDS);
        arrayList.add(SORT);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
