package org.apache.nifi.processors.azure.data.explorer;

import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.BlobAttributes;
import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse;
import org.apache.nifi.services.azure.data.explorer.KustoQueryService;

@CapabilityDescription("Query Azure Data Explorer and stream JSON results to output FlowFiles")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"Azure", "Data", "Explorer", "ADX", "Kusto"})
@WritesAttributes({@WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_ERROR_MESSAGE, description = "Azure Data Explorer query error message on failures"), @WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_EXECUTED, description = "Azure Data Explorer query executed"), @WritesAttribute(attribute = BlobAttributes.ATTR_NAME_MIME_TYPE, description = "Content Type set to application/json")})
/* loaded from: input_file:org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorer.class */
public class QueryAzureDataExplorer extends AbstractProcessor {
    public static final String QUERY_ERROR_MESSAGE = "query.error.message";
    public static final String QUERY_EXECUTED = "query.executed";
    protected static final String APPLICATION_JSON = "application/json";
    private volatile KustoQueryService service;
    public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("FlowFiles containing results of a successful Query").build();
    public static final Relationship FAILURE = new Relationship.Builder().name("failure").description("FlowFiles containing original input associated with a failed Query").build();
    public static final PropertyDescriptor KUSTO_QUERY_SERVICE = new PropertyDescriptor.Builder().name("Kusto Query Service").displayName("Kusto Query Service").description("Azure Data Explorer Kusto Query Service").required(true).identifiesControllerService(KustoQueryService.class).build();
    public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder().name("Database Name").displayName("Database Name").description("Azure Data Explorer Database Name for querying").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("Query").displayName("Query").description("Query to be run against Azure Data Explorer").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    private static final Set<Relationship> RELATIONSHIPS = new LinkedHashSet(Arrays.asList(SUCCESS, FAILURE));
    private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(KUSTO_QUERY_SERVICE, DATABASE_NAME, QUERY);

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.service = processContext.getProperty(KUSTO_QUERY_SERVICE).asControllerService(KustoQueryService.class);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        String value = processContext.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        try {
            flowFile = processSession.putAttribute(flowFile, QUERY_EXECUTED, value);
            KustoQueryResponse executeQuery = executeQuery(value2, value);
            if (executeQuery.isError()) {
                getLogger().error("Query failed: {}", new Object[]{executeQuery.getErrorMessage()});
                processSession.transfer(processSession.putAttribute(flowFile, QUERY_ERROR_MESSAGE, executeQuery.getErrorMessage()), FAILURE);
            } else {
                InputStream responseStream = executeQuery.getResponseStream();
                try {
                    flowFile = processSession.putAttribute(processSession.importFrom(responseStream, flowFile), CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
                    processSession.transfer(flowFile, SUCCESS);
                    if (responseStream != null) {
                        responseStream.close();
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            getLogger().error("Query failed", e);
            processSession.transfer(processSession.putAttribute(flowFile, QUERY_ERROR_MESSAGE, e.getMessage()), FAILURE);
        }
    }

    protected KustoQueryResponse executeQuery(String str, String str2) {
        return this.service.executeQuery(str, str2);
    }
}
