package pl.touk.nifi.processors;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordSchema;
import pl.touk.nifi.ignite.DataStreamerResult;
import pl.touk.nifi.ignite.IgniteRecordStreamerService;

@CapabilityDescription("Puts records from FlowFile into Ignite Cache using DataStreamer.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"Ignite", "insert", "update", "stream", "write", "put", "cache", "key", "record"})
/* loaded from: input_file:pl/touk/nifi/processors/PutIgniteRecord.class */
public class PutIgniteRecord extends AbstractProcessor {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().displayName("Record Reader").name("record-reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor DATA_STREAMER_SERVICE = new PropertyDescriptor.Builder().name("ignite-data-streamer-service").displayName("IgniteDataStreamer service").required(true).identifiesControllerService(IgniteRecordStreamerService.class).build();
    protected static final List<PropertyDescriptor> descriptors = Arrays.asList(DATA_STREAMER_SERVICE, RECORD_READER);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to Ignite cache are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to Ignite cache are routed to this relationship").build();
    public static final Relationship REL_CLIENT_CONNECTION_FAILURE = new Relationship.Builder().name("connection failure").build();
    protected static Set<Relationship> relationships;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        IgniteRecordStreamerService asControllerService = processContext.getProperty(DATA_STREAMER_SERVICE).asControllerService(IgniteRecordStreamerService.class);
        RecordReaderFactory asControllerService2 = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        DataStreamerResult dataStreamerResult = new DataStreamerResult();
        processSession.read(flowFile, inputStream -> {
            try {
                RecordReader createRecordReader = asControllerService2.createRecordReader(flowFile.getAttributes(), inputStream, flowFile.getSize(), getLogger());
                try {
                    RecordSchema schema = createRecordReader.getSchema();
                    ArrayList arrayList = new ArrayList();
                    while (true) {
                        MapRecord nextRecord = createRecordReader.nextRecord();
                        if (nextRecord == null) {
                            break;
                        } else {
                            arrayList.add(nextRecord);
                        }
                    }
                    dataStreamerResult.setError(asControllerService.addDataSync(schema, arrayList).getError());
                    if (createRecordReader != null) {
                        createRecordReader.close();
                    }
                } finally {
                }
            } catch (SchemaNotFoundException | MalformedRecordException e) {
                throw new ProcessException("Could not parse incoming data", e);
            }
        });
        if (dataStreamerResult.isSuccess()) {
            processSession.transfer(flowFile, REL_SUCCESS);
        } else if (dataStreamerResult.getError().getCause() instanceof IgniteClientDisconnectedException) {
            processSession.transfer(flowFile, REL_CLIENT_CONNECTION_FAILURE);
        } else {
            processSession.transfer(flowFile, REL_FAILURE);
        }
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_CLIENT_CONNECTION_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
