package org.apache.nifi.hbase;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hbase.io.JsonFullRowSerializer;
import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Fetches a row from an HBase table. The Destination property controls whether the cells are added as flow file attributes, or the row is written to the flow file content as JSON. This processor may be used to fetch a fixed row on a interval by specifying the table and row id directly in the processor, or it may be used to dynamically fetch rows by referencing the table and row id from incoming flow files.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hbase", "scan", "fetch", "get", "enrich"})
@WritesAttributes({@WritesAttribute(attribute = FetchHBaseRow.HBASE_TABLE_ATTR, description = "The name of the HBase table that the row was fetched from"), @WritesAttribute(attribute = FetchHBaseRow.HBASE_ROW_ATTR, description = "A JSON document representing the row. This property is only written when a Destination of flowfile-attributes is selected."), @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise")})
/* loaded from: input_file:org/apache/nifi/hbase/FetchHBaseRow.class */
public class FetchHBaseRow extends AbstractProcessor implements VisibilityFetchSupport {
    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder().name("HBase Client Service").description("Specifies the Controller Service to use for accessing HBase.").required(true).identifiesControllerService(HBaseClientService.class).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the HBase Table to fetch from.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder().name("Row Identifier").description("The identifier of the row to fetch.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder().name("Columns").description("An optional comma-separated list of \"<colFamily>:<colQualifier>\" pairs to fetch. To return all columns for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN)).build();
    static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("flowfile-attributes", "flowfile-attributes", "Adds the JSON document representing the row that was fetched as an attribute named hbase.row. The format of the JSON document is determined by the JSON Format property. NOTE: Fetching many large rows into attributes may have a negative impact on performance.");
    static final AllowableValue DESTINATION_CONTENT = new AllowableValue("flowfile-content", "flowfile-content", "Overwrites the FlowFile content with a JSON document representing the row that was fetched. The format of the JSON document is determined by the JSON Format property.");
    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder().name("Destination").description("Indicates whether the row fetched from HBase is written to FlowFile content or FlowFile Attributes.").required(true).allowableValues(new DescribedValue[]{DESTINATION_ATTRIBUTES, DESTINATION_CONTENT}).defaultValue(DESTINATION_ATTRIBUTES.getValue()).build();
    static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row", "Creates a JSON document with the format: {\"row\":<row-id>, \"cells\":[{\"fam\":<col-fam>, \"qual\":<col-val>, \"val\":<value>, \"ts\":<timestamp>}]}.");
    static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val", "Creates a JSON document with the format: {\"<col-qual>\":\"<value>\", \"<col-qual>\":\"<value>\".");
    static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder().name("JSON Format").description("Specifies how to represent the HBase row as a JSON document.").required(true).allowableValues(new DescribedValue[]{JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE}).defaultValue(JSON_FORMAT_FULL_ROW.getValue()).build();
    static final AllowableValue ENCODING_NONE = new AllowableValue("none", "none", "Creates a String using the bytes of given data and the given Character Set.");
    static final AllowableValue ENCODING_BASE64 = new AllowableValue("base64", "base64", "Creates a Base64 encoded String of the given data.");
    static final PropertyDescriptor JSON_VALUE_ENCODING = new PropertyDescriptor.Builder().name("JSON Value Encoding").description("Specifies how to represent row ids, column families, column qualifiers, and values when stored in FlowFile attributes, or written to JSON.").required(true).allowableValues(new DescribedValue[]{ENCODING_NONE, ENCODING_BASE64}).defaultValue(ENCODING_NONE.getValue()).build();
    static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder().name("Decode Character Set").description("The character set used to decode data from HBase.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder().name("Encode Character Set").description("The character set used to encode the JSON representation of the row.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successful fetches are routed to this relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All failed fetches are routed to this relationship.").build();
    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found").description("All fetches where the row id is not found are routed to this relationship.").build();
    static final String HBASE_TABLE_ATTR = "hbase.table";
    static final String HBASE_ROW_ATTR = "hbase.row";
    static final List<PropertyDescriptor> properties;
    static final Set<Relationship> relationships;
    private volatile Charset decodeCharset;
    private volatile Charset encodeCharset;
    private volatile RowSerializer regularRowSerializer;
    private volatile RowSerializer base64RowSerializer;

    /* loaded from: input_file:org/apache/nifi/hbase/FetchHBaseRow$FetchHBaseRowHandler.class */
    private interface FetchHBaseRowHandler extends ResultHandler {
        FlowFile getFlowFile();

        boolean handledRow();
    }

    /* loaded from: input_file:org/apache/nifi/hbase/FetchHBaseRow$FlowFileAttributeHandler.class */
    private static class FlowFileAttributeHandler implements FetchHBaseRowHandler {
        private FlowFile flowFile;
        private final ProcessSession session;
        private final RowSerializer rowSerializer;
        private boolean handledRow = false;

        public FlowFileAttributeHandler(FlowFile flowFile, ProcessSession processSession, RowSerializer rowSerializer) {
            this.flowFile = flowFile;
            this.session = processSession;
            this.rowSerializer = rowSerializer;
        }

        public void handle(byte[] bArr, ResultCell[] resultCellArr) {
            this.flowFile = this.session.putAttribute(this.flowFile, FetchHBaseRow.HBASE_ROW_ATTR, this.rowSerializer.serialize(bArr, resultCellArr));
            this.handledRow = true;
        }

        @Override // org.apache.nifi.hbase.FetchHBaseRow.FetchHBaseRowHandler
        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        @Override // org.apache.nifi.hbase.FetchHBaseRow.FetchHBaseRowHandler
        public boolean handledRow() {
            return this.handledRow;
        }
    }

    /* loaded from: input_file:org/apache/nifi/hbase/FetchHBaseRow$FlowFileContentHandler.class */
    private static class FlowFileContentHandler implements FetchHBaseRowHandler {
        private FlowFile flowFile;
        private final ProcessSession session;
        private final RowSerializer serializer;
        private boolean handledRow = false;

        public FlowFileContentHandler(FlowFile flowFile, ProcessSession processSession, RowSerializer rowSerializer) {
            this.flowFile = flowFile;
            this.session = processSession;
            this.serializer = rowSerializer;
        }

        public void handle(byte[] bArr, ResultCell[] resultCellArr) {
            this.flowFile = this.session.write(this.flowFile, outputStream -> {
                this.serializer.serialize(bArr, resultCellArr, outputStream);
            });
            this.handledRow = true;
        }

        @Override // org.apache.nifi.hbase.FetchHBaseRow.FetchHBaseRowHandler
        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        @Override // org.apache.nifi.hbase.FetchHBaseRow.FetchHBaseRowHandler
        public boolean handledRow() {
            return this.handledRow;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.decodeCharset = Charset.forName(processContext.getProperty(DECODE_CHARSET).getValue());
        this.encodeCharset = Charset.forName(processContext.getProperty(ENCODE_CHARSET).getValue());
        if (processContext.getProperty(JSON_FORMAT).getValue().equals(JSON_FORMAT_FULL_ROW.getValue())) {
            this.regularRowSerializer = new JsonFullRowSerializer(this.decodeCharset, this.encodeCharset);
            this.base64RowSerializer = new JsonFullRowSerializer(this.decodeCharset, this.encodeCharset, true);
        } else {
            this.regularRowSerializer = new JsonQualifierAndValueRowSerializer(this.decodeCharset, this.encodeCharset);
            this.base64RowSerializer = new JsonQualifierAndValueRowSerializer(this.decodeCharset, this.encodeCharset, true);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        String value = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank(value)) {
            getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[]{flowFile});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            return;
        }
        String value2 = processContext.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
        if (StringUtils.isBlank(value2)) {
            getLogger().error("Row Identifier is blank or null for {}, transferring to failure", new Object[]{flowFile});
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
            return;
        }
        List<Column> columns = getColumns(processContext.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue());
        HBaseClientService asControllerService = processContext.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
        String value3 = processContext.getProperty(DESTINATION).getValue();
        boolean equals = processContext.getProperty(JSON_VALUE_ENCODING).getValue().equals(ENCODING_BASE64.getValue());
        List<String> authorizations = getAuthorizations(processContext, flowFile);
        RowSerializer rowSerializer = equals ? this.base64RowSerializer : this.regularRowSerializer;
        FetchHBaseRowHandler flowFileContentHandler = value3.equals(DESTINATION_CONTENT.getValue()) ? new FlowFileContentHandler(flowFile, processSession, rowSerializer) : new FlowFileAttributeHandler(flowFile, processSession, rowSerializer);
        byte[] bytes = value2.getBytes(StandardCharsets.UTF_8);
        try {
            asControllerService.scan(value, bytes, bytes, columns, authorizations, flowFileContentHandler);
            FlowFile flowFile2 = flowFileContentHandler.getFlowFile();
            if (!flowFileContentHandler.handledRow()) {
                getLogger().debug("Row {} not found in {}, transferring to not found", new Object[]{value2, value});
                processSession.transfer(flowFile2, REL_NOT_FOUND);
                return;
            }
            if (getLogger().isDebugEnabled()) {
                getLogger().debug("Fetched {} from {} with row id {}", new Object[]{flowFile2, value, value2});
            }
            HashMap hashMap = new HashMap();
            hashMap.put(HBASE_TABLE_ATTR, value);
            if (value3.equals(DESTINATION_CONTENT.getValue())) {
                hashMap.put(CoreAttributes.MIME_TYPE.key(), "application/json");
            }
            FlowFile putAllAttributes = processSession.putAllAttributes(flowFile2, hashMap);
            String transitUri = asControllerService.toTransitUri(value, value2);
            processSession.getProvenanceReporter().fetch(putAllAttributes, transitUri);
            if (!value3.equals(DESTINATION_CONTENT.getValue())) {
                processSession.getProvenanceReporter().modifyAttributes(putAllAttributes, "Added attributes to FlowFile from " + transitUri);
            }
            processSession.transfer(putAllAttributes, REL_SUCCESS);
        } catch (Exception e) {
            getLogger().error("Unable to fetch row {} from {}", new Object[]{value2, value, e});
            processSession.transfer(flowFileContentHandler.getFlowFile(), REL_FAILURE);
        }
    }

    private List<Column> getColumns(String str) {
        String[] split = (str == null || str.isEmpty()) ? new String[0] : str.split(",");
        ArrayList arrayList = new ArrayList(split.length);
        for (String str2 : split) {
            if (str2.contains(":")) {
                String[] split2 = str2.split(":");
                arrayList.add(new Column(split2[0].getBytes(StandardCharsets.UTF_8), split2[1].getBytes(StandardCharsets.UTF_8)));
            } else {
                arrayList.add(new Column(str2.getBytes(StandardCharsets.UTF_8), (byte[]) null));
            }
        }
        return arrayList;
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(HBASE_CLIENT_SERVICE);
        arrayList.add(TABLE_NAME);
        arrayList.add(ROW_ID);
        arrayList.add(COLUMNS);
        arrayList.add(AUTHORIZATIONS);
        arrayList.add(DESTINATION);
        arrayList.add(JSON_FORMAT);
        arrayList.add(JSON_VALUE_ENCODING);
        arrayList.add(ENCODE_CHARSET);
        arrayList.add(DECODE_CHARSET);
        properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_NOT_FOUND);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
