/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchTransportClientProcessor;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags(value={"elasticsearch", "fetch", "read", "get"})
@CapabilityDescription(value="Retrieves a document from Elasticsearch using the specified connection properties and the identifier of the document to retrieve. If the cluster has been configured for authorization and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor supports Elasticsearch 2.x clusters.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The filename attributes is set to the document identifier"), @WritesAttribute(attribute="es.index", description="The Elasticsearch index containing the document"), @WritesAttribute(attribute="es.type", description="The Elasticsearch document type")})
public class FetchElasticsearch
extends AbstractElasticsearchTransportClientProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed").build();
    public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found").description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster").build();
    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Identifier").description("The identifier for the document to be fetched").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("Index").description("The name of the index to read from").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("Type").description("The type of this document (used by Elasticsearch for indexing and searching)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
        String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        ComponentLog logger = this.getLogger();
        try {
            GetResponse getResponse;
            logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
            long startNanos = System.nanoTime();
            GetRequestBuilder getRequestBuilder = ((Client)this.esClient.get()).prepareGet(index, docType, docId);
            if (this.authToken != null) {
                getRequestBuilder.putHeader("Authorization", (Object)this.authToken);
            }
            if ((getResponse = (GetResponse)getRequestBuilder.execute().actionGet()) == null || !getResponse.isExists()) {
                logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found", new Object[]{index, docType, docId});
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_NOT_FOUND);
            } else {
                flowFile = session.putAttribute(flowFile, "filename", docId);
                flowFile = session.putAttribute(flowFile, "es.index", index);
                flowFile = session.putAttribute(flowFile, "es.type", docType);
                flowFile = session.write(flowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        out.write(getResponse.getSourceAsString().getBytes(charset));
                    }
                });
                logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
                long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
                session.getProvenanceReporter().fetch(flowFile, uri, millis);
                session.transfer(flowFile, REL_SUCCESS);
            }
        }
        catch (ElasticsearchTimeoutException | NoNodeAvailableException | NodeClosedException | ReceiveTimeoutTransportException exceptionToRetry) {
            logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.). Routing to retry", new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
            session.transfer(flowFile, REL_RETRY);
            context.yield();
        }
        catch (Exception e) {
            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, (Throwable)e);
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
    }

    @Override
    @OnStopped
    public void closeClient() {
        super.closeClient();
    }

    static {
        HashSet<Relationship> _rels = new HashSet<Relationship>();
        _rels.add(REL_SUCCESS);
        _rels.add(REL_FAILURE);
        _rels.add(REL_RETRY);
        _rels.add(REL_NOT_FOUND);
        relationships = Collections.unmodifiableSet(_rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(CLUSTER_NAME);
        descriptors.add(HOSTS);
        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
        descriptors.add(PROP_SHIELD_LOCATION);
        descriptors.add(USERNAME);
        descriptors.add(PASSWORD);
        descriptors.add(PING_TIMEOUT);
        descriptors.add(SAMPLER_INTERVAL);
        descriptors.add(DOC_ID);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(CHARSET);
        propertyDescriptors = Collections.unmodifiableList(descriptors);
    }
}

