package org.apache.nifi.processors.livy;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.api.livy.LivySessionService;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Execute Spark Code over a Livy-managed HTTP session to a live Spark context. Supports cached RDD sharing.")
@Tags({"spark", "livy", "http", "execute"})
/* loaded from: input_file:org/apache/nifi/processors/livy/ExecuteSparkInteractive.class */
public class ExecuteSparkInteractive extends AbstractProcessor {
    public static final PropertyDescriptor LIVY_CONTROLLER_SERVICE = new PropertyDescriptor.Builder().name("exec-spark-iactive-livy-controller-service").displayName("Livy Controller Service").description("The controller service to use for Livy-managed session(s).").required(true).identifiesControllerService(LivySessionService.class).build();
    public static final PropertyDescriptor CODE = new PropertyDescriptor.Builder().name("exec-spark-iactive-code").displayName("Code").description("The code to execute in the session. This property can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming flowfiles. If this property is empty, the content of the incoming flow file is expected to contain valid code to be issued by the processor to the session. Note that Expression Language is not evaluated for flow file contents.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("exec-spark-iactive-charset").displayName("Character Set").description("The character set encoding for the incoming flow file.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor STATUS_CHECK_INTERVAL = new PropertyDescriptor.Builder().name("exec-spark-iactive-status-check-interval").displayName("Status Check Interval").description("The amount of time to wait between checking the status of an operation.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("1 sec").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully processed are sent to this relationship").build();
    public static final Relationship REL_WAIT = new Relationship.Builder().name("wait").description("FlowFiles that are waiting on an available Spark session will be sent to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles are routed to this relationship when they cannot be parsed").build();
    private volatile List<PropertyDescriptor> properties;
    private volatile Set<Relationship> relationships;

    public void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(LIVY_CONTROLLER_SERVICE);
        arrayList.add(CODE);
        arrayList.add(CHARSET);
        arrayList.add(STATUS_CHECK_INTERVAL);
        this.properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_WAIT);
        hashSet.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        Charset charset;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        LivySessionService livySessionService = (LivySessionService) processContext.getProperty(LIVY_CONTROLLER_SERVICE).asControllerService(LivySessionService.class);
        try {
            Map session = livySessionService.getSession();
            if (session == null || session.isEmpty()) {
                logger.debug("No Spark session available (yet), routing flowfile to wait");
                processSession.transfer(flowFile, REL_WAIT);
                processContext.yield();
                return;
            }
            long longValue = processContext.getProperty(STATUS_CHECK_INTERVAL).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
            try {
                charset = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
            } catch (Exception e) {
                logger.warn("Illegal character set name specified, defaulting to UTF-8");
                charset = StandardCharsets.UTF_8;
            }
            String str = (String) session.get("sessionId");
            String str2 = (String) session.get("livyUrl");
            String value = processContext.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isEmpty(value)) {
                try {
                    InputStream read = processSession.read(flowFile);
                    Throwable th = null;
                    try {
                        value = IOUtils.toString(read, charset);
                        if (read != null) {
                            if (0 != 0) {
                                try {
                                    read.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                read.close();
                            }
                        }
                    } finally {
                    }
                } catch (IOException e2) {
                    logger.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, e2.getMessage()}, e2);
                    processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                    return;
                }
            }
            try {
                JSONObject submitAndHandleJob = submitAndHandleJob(str2, livySessionService, str, "{\"code\":\"" + StringEscapeUtils.escapeJson(value) + "\"}", longValue);
                logger.debug("ExecuteSparkInteractive Result of Job Submit: " + submitAndHandleJob);
                if (submitAndHandleJob == null) {
                    processSession.transfer(flowFile, REL_FAILURE);
                } else {
                    try {
                        JSONObject jSONObject = submitAndHandleJob.getJSONObject("data");
                        flowFile = processSession.putAttribute(processSession.write(flowFile, outputStream -> {
                            outputStream.write(jSONObject.toString().getBytes());
                        }), CoreAttributes.MIME_TYPE.key(), "application/json");
                        processSession.transfer(flowFile, REL_SUCCESS);
                    } catch (JSONException e3) {
                        logger.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)");
                        processSession.transfer(processSession.penalize(processSession.putAttribute(processSession.write(flowFile, outputStream2 -> {
                            outputStream2.write(submitAndHandleJob.toString().getBytes());
                        }), CoreAttributes.MIME_TYPE.key(), "application/json")), REL_FAILURE);
                    }
                }
            } catch (IOException | SessionManagerException e4) {
                logger.error("Failure processing flowfile {} due to {}, penalizing and routing to failure", new Object[]{flowFile, e4.getMessage()}, e4);
                processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            }
        } catch (SessionManagerException e5) {
            logger.error("Error opening spark session, routing flowfile to wait", e5);
            processSession.transfer(flowFile, REL_WAIT);
            processContext.yield();
        }
    }

    private JSONObject submitAndHandleJob(String str, LivySessionService livySessionService, String str2, String str3, long j) throws IOException, SessionManagerException {
        ComponentLog logger = getLogger();
        String str4 = str + "/sessions/" + str2 + "/statements";
        JSONObject jSONObject = null;
        HashMap hashMap = new HashMap();
        hashMap.put("Content-Type", "application/json");
        hashMap.put("X-Requested-By", "nifi");
        hashMap.put("Accept", "application/json");
        logger.debug("submitAndHandleJob() Submitting Job to Spark via: " + str4);
        try {
            JSONObject readJSONObjectFromUrlPOST = readJSONObjectFromUrlPOST(str4, livySessionService, hashMap, str3);
            logger.debug("submitAndHandleJob() Job Info: " + readJSONObjectFromUrlPOST);
            String str5 = str4 + "/" + String.valueOf(readJSONObjectFromUrlPOST.getInt("id"));
            JSONObject readJSONObjectFromUrl = readJSONObjectFromUrl(str5, livySessionService, hashMap);
            String string = readJSONObjectFromUrl.getString("state");
            logger.debug("submitAndHandleJob() New Job Info: " + readJSONObjectFromUrl);
            Thread.sleep(j);
            if (string.equalsIgnoreCase("available")) {
                logger.debug("submitAndHandleJob() Job status is: " + string + ". returning output...");
                jSONObject = readJSONObjectFromUrl.getJSONObject("output");
            } else if (string.equalsIgnoreCase("running") || string.equalsIgnoreCase("waiting")) {
                while (!string.equalsIgnoreCase("available")) {
                    logger.debug("submitAndHandleJob() Job status is: " + string + ". Waiting for job to complete...");
                    Thread.sleep(j);
                    readJSONObjectFromUrl = readJSONObjectFromUrl(str5, livySessionService, hashMap);
                    string = readJSONObjectFromUrl.getString("state");
                }
                jSONObject = readJSONObjectFromUrl.getJSONObject("output");
            } else if (string.equalsIgnoreCase("error") || string.equalsIgnoreCase("cancelled") || string.equalsIgnoreCase("cancelling")) {
                logger.debug("Job status is: " + string + ". Job did not complete due to error or has been cancelled. Check SparkUI for details.");
                throw new IOException(string);
            }
            return jSONObject;
        } catch (JSONException | InterruptedException e) {
            throw new IOException((Throwable) e);
        }
    }

    private JSONObject readJSONObjectFromUrlPOST(String str, LivySessionService livySessionService, Map<String, String> map, String str2) throws IOException, JSONException, SessionManagerException {
        HttpClient connection = livySessionService.getConnection();
        HttpPost httpPost = new HttpPost(str);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            httpPost.addHeader(entry.getKey(), entry.getValue());
        }
        httpPost.setEntity(new StringEntity(str2));
        HttpResponse execute = connection.execute(httpPost);
        if (execute.getStatusLine().getStatusCode() == 200 || execute.getStatusLine().getStatusCode() == 201) {
            return readAllIntoJSONObject(execute.getEntity().getContent());
        }
        throw new RuntimeException("Failed : HTTP error code : " + execute.getStatusLine().getStatusCode() + " : " + execute.getStatusLine().getReasonPhrase());
    }

    private JSONObject readJSONObjectFromUrl(String str, LivySessionService livySessionService, Map<String, String> map) throws IOException, JSONException, SessionManagerException {
        HttpClient connection = livySessionService.getConnection();
        HttpGet httpGet = new HttpGet(str);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            httpGet.addHeader(entry.getKey(), entry.getValue());
        }
        return readAllIntoJSONObject(connection.execute(httpGet).getEntity().getContent());
    }

    private JSONObject readAllIntoJSONObject(InputStream inputStream) throws IOException, JSONException {
        return new JSONObject(IOUtils.toString(new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))));
    }
}
