package org.apache.nifi.processor.util.listen;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.antlr.runtime.debug.Profiler;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.event.Event;

/* loaded from: input_file:WEB-INF/lib/nifi-processor-utils-1.8.0.jar:org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.class */
public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> {
    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("Max Batch Size").description("The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with the <Message Delimiter> up to this configured maximum number of messages").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("1").required(true).build();
    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder().name("Message Delimiter").displayName("Batching Message Delimiter").description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("\\n").required(true).build();
    protected volatile byte[] messageDemarcatorBytes;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/nifi-processor-utils-1.8.0.jar:org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor$FlowFileEventBatch.class */
    public final class FlowFileEventBatch {
        private FlowFile flowFile;
        private List<E> events;

        public FlowFileEventBatch(FlowFile flowFile, List<E> list) {
            this.flowFile = flowFile;
            this.events = list;
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public List<E> getEvents() {
            return this.events;
        }

        public void setFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }

    @Override // org.apache.nifi.processor.util.listen.AbstractListenEventProcessor
    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ListenerProperties.NETWORK_INTF_NAME);
        arrayList.add(PORT);
        arrayList.add(RECV_BUFFER_SIZE);
        arrayList.add(MAX_MESSAGE_QUEUE_SIZE);
        arrayList.add(MAX_SOCKET_BUFFER_SIZE);
        arrayList.add(CHARSET);
        arrayList.add(MAX_BATCH_SIZE);
        arrayList.add(MESSAGE_DELIMITER);
        arrayList.addAll(getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.addAll(getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.nifi.processor.util.listen.AbstractListenEventProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        super.onScheduled(processContext);
        this.messageDemarcatorBytes = processContext.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", StringUtils.CR).replace("\\t", Profiler.DATA_SEP).getBytes(this.charset);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        Map<String, AbstractListenEventBatchingProcessor<E>.FlowFileEventBatch> batches = getBatches(processSession, processContext.getProperty(MAX_BATCH_SIZE).asInteger().intValue(), this.messageDemarcatorBytes);
        if (batches.size() == 0) {
            return;
        }
        List<E> arrayList = new ArrayList<>();
        for (Map.Entry<String, AbstractListenEventBatchingProcessor<E>.FlowFileEventBatch> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List<E> events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0 || events.size() == 0) {
                processSession.remove(flowFile);
                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[]{entry.getKey()});
            } else {
                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, getAttributes(entry.getValue()));
                getLogger().debug("Transferring {} to success", new Object[]{putAllAttributes});
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                processSession.adjustCounter("FlowFiles Transferred to Success", 1L, false);
                processSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(entry.getValue()));
                arrayList.addAll(events);
            }
        }
        postProcess(processContext, processSession, arrayList);
    }

    protected abstract Map<String, String> getAttributes(AbstractListenEventBatchingProcessor<E>.FlowFileEventBatch flowFileEventBatch);

    protected abstract String getTransitUri(AbstractListenEventBatchingProcessor<E>.FlowFileEventBatch flowFileEventBatch);

    protected void postProcess(ProcessContext processContext, ProcessSession processSession, List<E> list) {
    }

    protected Map<String, AbstractListenEventBatchingProcessor<E>.FlowFileEventBatch> getBatches(ProcessSession processSession, int i, final byte[] bArr) {
        E message;
        HashMap hashMap = new HashMap();
        int i2 = 0;
        while (i2 < i && (message = getMessage(true, true, processSession)) != null) {
            String batchKey = getBatchKey(message);
            FlowFileEventBatch flowFileEventBatch = (FlowFileEventBatch) hashMap.get(batchKey);
            if (flowFileEventBatch == null) {
                flowFileEventBatch = new FlowFileEventBatch(processSession.create(), new ArrayList());
                hashMap.put(batchKey, flowFileEventBatch);
            }
            flowFileEventBatch.getEvents().add(message);
            final boolean z = i2 > 0;
            try {
                final byte[] data = message.getData();
                flowFileEventBatch.setFlowFile(processSession.append(flowFileEventBatch.getFlowFile(), new OutputStreamCallback() { // from class: org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor.1
                    public void process(OutputStream outputStream) throws IOException {
                        if (z) {
                            outputStream.write(bArr);
                        }
                        outputStream.write(data);
                    }
                }));
                i2++;
            } catch (Exception e) {
                getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", new Object[]{e.getMessage()}, e);
                this.errorEvents.offer(message);
            }
        }
        return hashMap;
    }

    protected String getBatchKey(E e) {
        return e.getSender();
    }
}
