/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor;
import org.apache.nifi.processors.kite.AbstractKiteProcessor;
import org.apache.nifi.processors.kite.AvroUtil;
import org.apache.nifi.processors.kite.FailureTracker;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.CSVFileReader;
import org.kitesdk.data.spi.filesystem.CSVProperties;

@Tags(value={"kite", "csv", "avro"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro
extends AbstractKiteConvertProcessor {
    private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
    private static final Validator CHAR_VALIDATOR = new Validator(){

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            input = ConvertCSVToAvro.unescapeString(input);
            return new ValidationResult.Builder().subject(subject).input(input).explanation("Only non-null single characters are supported").valid(input.length() == 1 && input.charAt(0) != '\u0000' || context.isExpressionLanguagePresent(input)).build();
        }
    };
    private static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Avro content that was converted successfully from CSV").build();
    private static final Relationship FAILURE = new Relationship.Builder().name("failure").description("CSV content that could not be processed").build();
    private static final Relationship INCOMPATIBLE = new Relationship.Builder().name("incompatible").description("CSV content that could not be converted").build();
    @VisibleForTesting
    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder().name("Record schema").description("Outgoing Avro schema for each record created from a CSV row").addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    @VisibleForTesting
    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("CSV charset").description("Character set for CSV files").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(ConvertCSVToAvro.DEFAULTS.charset).build();
    @VisibleForTesting
    static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder().name("CSV delimiter").description("Delimiter character for CSV records").addValidator(CHAR_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(ConvertCSVToAvro.DEFAULTS.delimiter).build();
    @VisibleForTesting
    static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder().name("CSV quote character").description("Quote character for CSV values").addValidator(CHAR_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(ConvertCSVToAvro.DEFAULTS.quote).build();
    @VisibleForTesting
    static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder().name("CSV escape character").description("Escape character for CSV values").addValidator(CHAR_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(ConvertCSVToAvro.DEFAULTS.escape).build();
    @VisibleForTesting
    static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder().name("Use CSV header line").description("Whether to use the first line as a header").addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(String.valueOf(ConvertCSVToAvro.DEFAULTS.useHeader)).build();
    @VisibleForTesting
    static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder().name("Lines to skip").description("Number of lines to skip before reading header or data").addValidator(StandardValidators.createLongValidator((long)0L, (long)Integer.MAX_VALUE, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue(String.valueOf(ConvertCSVToAvro.DEFAULTS.linesToSkip)).build();
    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.builder().addAll(AbstractKiteProcessor.getProperties()).add((Object)SCHEMA).add((Object)CHARSET).add((Object)DELIMITER).add((Object)QUOTE).add((Object)ESCAPE).add((Object)HAS_HEADER).add((Object)LINES_TO_SKIP).add((Object)COMPRESSION_TYPE).build();
    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.builder().add((Object)SUCCESS).add((Object)FAILURE).add((Object)INCOMPATIBLE).build();

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void createCSVProperties(ProcessContext context) throws IOException {
        super.setDefaultConfiguration(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        block25: {
            Schema schema;
            FlowFile incomingCSV = session.get();
            if (incomingCSV == null) {
                return;
            }
            final CSVProperties props = new CSVProperties.Builder().charset(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingCSV).getValue()).delimiter(context.getProperty(DELIMITER).evaluateAttributeExpressions(incomingCSV).getValue()).quote(context.getProperty(QUOTE).evaluateAttributeExpressions(incomingCSV).getValue()).escape(context.getProperty(ESCAPE).evaluateAttributeExpressions(incomingCSV).getValue()).hasHeader(context.getProperty(HAS_HEADER).evaluateAttributeExpressions(incomingCSV).asBoolean().booleanValue()).linesToSkip(context.getProperty(LINES_TO_SKIP).evaluateAttributeExpressions(incomingCSV).asInteger().intValue()).build();
            String schemaProperty = context.getProperty(SCHEMA).evaluateAttributeExpressions(incomingCSV).getValue();
            try {
                schema = ConvertCSVToAvro.getSchema(schemaProperty, DefaultConfiguration.get());
            }
            catch (SchemaNotFoundException e) {
                this.getLogger().error("Cannot find schema: " + schemaProperty);
                session.transfer(incomingCSV, FAILURE);
                return;
            }
            try (final DataFileWriter writer = new DataFileWriter(AvroUtil.newDatumWriter(schema, GenericData.Record.class));){
                writer.setCodec(this.getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
                try {
                    final AtomicLong written = new AtomicLong(0L);
                    final FailureTracker failures = new FailureTracker();
                    FlowFile badRecords = session.clone(incomingCSV);
                    FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback(){

                        public void process(InputStream in, OutputStream out) throws IOException {
                            try (CSVFileReader reader = new CSVFileReader(in, props, schema, GenericData.Record.class);){
                                reader.initialize();
                                try (DataFileWriter w = writer.create(schema, out);){
                                    while (reader.hasNext()) {
                                        try {
                                            GenericData.Record record = (GenericData.Record)reader.next();
                                            w.append((Object)record);
                                            written.incrementAndGet();
                                        }
                                        catch (DatasetRecordException e) {
                                            failures.add(e);
                                        }
                                    }
                                }
                            }
                        }
                    });
                    long errors = failures.count();
                    session.adjustCounter("Converted records", written.get(), false);
                    session.adjustCounter("Conversion errors", errors, false);
                    if (written.get() > 0L) {
                        outgoingAvro = session.putAttribute(outgoingAvro, CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
                        session.transfer(outgoingAvro, SUCCESS);
                        if (errors > 0L) {
                            this.getLogger().warn("Failed to convert {}/{} records from CSV to Avro", new Object[]{errors, errors + written.get()});
                            badRecords = session.putAttribute(badRecords, "errors", failures.summary());
                            session.transfer(badRecords, INCOMPATIBLE);
                        } else {
                            session.remove(badRecords);
                        }
                        break block25;
                    }
                    session.remove(outgoingAvro);
                    if (errors > 0L) {
                        this.getLogger().warn("Failed to convert {}/{} records from CSV to Avro", new Object[]{errors, errors});
                        badRecords = session.putAttribute(badRecords, "errors", failures.summary());
                    } else {
                        badRecords = session.putAttribute(badRecords, "errors", "No incoming records");
                    }
                    session.transfer(badRecords, FAILURE);
                }
                catch (ProcessException | DatasetIOException e) {
                    this.getLogger().error("Failed reading or writing", e);
                    session.transfer(incomingCSV, FAILURE);
                }
                catch (DatasetException e) {
                    this.getLogger().error("Failed to read FlowFile", (Throwable)e);
                    session.transfer(incomingCSV, FAILURE);
                }
            }
            catch (IOException ioe) {
                throw new RuntimeException("Unable to close Avro Writer", ioe);
            }
        }
    }

    private static String unescapeString(String input) {
        if (input.length() > 1) {
            input = StringEscapeUtils.unescapeJava((String)input);
        }
        return input;
    }
}

