package org.apache.nifi.kafka.connect;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.class */
public class StatelessNiFiSourceTask extends SourceTask {
    private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
    private static final long FAILURE_YIELD_MILLIS = 1000;
    private StatelessDataflow dataflow;
    private String outputPortName;
    private String topicName;
    private String topicNameAttribute;
    private TriggerResult triggerResult;
    private String keyAttributeName;
    private Pattern headerAttributeNamePattern;
    private long timeoutMillis;
    private String dataflowName;
    private long failureYieldExpiration = 0;
    private final Map<String, String> clusterStatePartitionMap = Collections.singletonMap(StatelessNiFiSourceConfig.STATE_MAP_KEY, "CLUSTER");
    private Map<String, String> localStatePartitionMap = new HashMap();
    private final AtomicLong unacknowledgedRecords = new AtomicLong(0);

    public String version() {
        return StatelessKafkaConnectorUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        logger.info("Starting Source Task");
        StatelessNiFiSourceConfig createConfig = createConfig(map);
        this.timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(createConfig.getDataflowTimeout(), TimeUnit.MILLISECONDS);
        this.topicName = createConfig.getTopicName();
        this.topicNameAttribute = createConfig.getTopicNameAttribute();
        this.keyAttributeName = createConfig.getKeyAttribute();
        if (this.topicName == null && this.topicNameAttribute == null) {
            throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified");
        }
        String headerRegex = createConfig.getHeaderRegex();
        this.headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex);
        this.dataflow = StatelessKafkaConnectorUtil.createDataflow(createConfig);
        this.dataflow.initialize();
        this.dataflowName = createConfig.getDataflowName();
        this.outputPortName = createConfig.getOutputPortName();
        if (this.outputPortName == null) {
            Set outputPortNames = this.dataflow.getOutputPortNames();
            if (outputPortNames.isEmpty()) {
                throw new ConfigException("The dataflow specified for <" + this.dataflowName + "> does not have an Output Port at the root level. Dataflows used for a Kafka Connect Source Task must have at least one Output Port at the root level.");
            }
            if (outputPortNames.size() > 1) {
                throw new ConfigException("The dataflow specified for <" + this.dataflowName + "> has multiple Output Ports at the root level (" + String.valueOf(outputPortNames) + "). The output.port property must be set to indicate which of these Ports Kafka records should be retrieved from.");
            }
            this.outputPortName = (String) outputPortNames.iterator().next();
        }
        this.localStatePartitionMap.put(StatelessNiFiSourceConfig.STATE_MAP_KEY, createConfig.getStateMapKey());
        Map offset = this.context.offsetStorageReader().offset(this.localStatePartitionMap);
        Map offset2 = this.context.offsetStorageReader().offset(this.clusterStatePartitionMap);
        this.dataflow.setComponentStates(offset, Scope.LOCAL);
        this.dataflow.setComponentStates(offset2, Scope.CLUSTER);
    }

    public List<SourceRecord> poll() throws InterruptedException {
        Map<String, String> map;
        long max = Math.max(this.failureYieldExpiration, this.dataflow.getSourceYieldExpiration()) - System.currentTimeMillis();
        if (max > 0) {
            logger.debug("Source of NiFi flow has opted to yield for {} milliseconds. Will pause dataflow until that time period has elapsed.", Long.valueOf(max));
            Thread.sleep(max);
            return null;
        }
        if (this.unacknowledgedRecords.get() > 0) {
            return null;
        }
        logger.debug("Triggering dataflow");
        long nanoTime = System.nanoTime();
        DataflowTrigger trigger = this.dataflow.trigger();
        Optional result = trigger.getResult(this.timeoutMillis, TimeUnit.MILLISECONDS);
        if (!result.isPresent()) {
            logger.warn("Dataflow timed out after waiting {} milliseconds. Will cancel the execution.", Long.valueOf(this.timeoutMillis));
            trigger.cancel();
            return null;
        }
        this.triggerResult = (TriggerResult) result.get();
        if (!this.triggerResult.isSuccessful()) {
            logger.error("Dataflow {} failed to execute properly", this.dataflowName, this.triggerResult.getFailureCause().orElse(null));
            trigger.cancel();
            this.failureYieldExpiration = System.currentTimeMillis() + FAILURE_YIELD_MILLIS;
            return null;
        }
        verifyFlowFilesTransferredToProperPort(this.triggerResult, this.outputPortName, trigger);
        long nanoTime2 = System.nanoTime() - nanoTime;
        List<FlowFile> outputFlowFiles = this.triggerResult.getOutputFlowFiles(this.outputPortName);
        ArrayList arrayList = new ArrayList(outputFlowFiles.size());
        Map<String, ?> componentStates = this.dataflow.getComponentStates(Scope.CLUSTER);
        if (componentStates == null || componentStates.isEmpty()) {
            componentStates = this.dataflow.getComponentStates(Scope.LOCAL);
            map = this.localStatePartitionMap;
        } else {
            map = this.clusterStatePartitionMap;
        }
        try {
            for (FlowFile flowFile : outputFlowFiles) {
                arrayList.add(createSourceRecord(flowFile, this.triggerResult.readContentAsByteArray(flowFile), componentStates, map));
            }
            logger.debug("Returning {} records from poll() method (took {} nanos to run dataflow)", Integer.valueOf(arrayList.size()), Long.valueOf(nanoTime2));
            if (arrayList.size() > 0) {
                this.unacknowledgedRecords.addAndGet(arrayList.size());
            } else {
                this.triggerResult.acknowledge();
            }
            return arrayList;
        } catch (Exception e) {
            logger.error("Failed to obtain contents of Output FlowFiles in order to form Kafka Record", e);
            this.triggerResult.abort(e);
            this.failureYieldExpiration = System.currentTimeMillis() + FAILURE_YIELD_MILLIS;
            return null;
        }
    }

    protected StatelessNiFiSourceConfig createConfig(Map<String, String> map) {
        return new StatelessNiFiSourceConfig(map);
    }

    private void verifyFlowFilesTransferredToProperPort(TriggerResult triggerResult, String str, DataflowTrigger dataflowTrigger) {
        for (Map.Entry entry : triggerResult.getOutputFlowFiles().entrySet()) {
            String str2 = (String) entry.getKey();
            if (!((List) entry.getValue()).isEmpty() && !str.equals(str2)) {
                logger.error("Dataflow transferred FlowFiles to Port {} but was expecting data to be transferred to {}. Rolling back session.", str2, str);
                dataflowTrigger.cancel();
                throw new RetriableException("Data was transferred to unexpected port. Expected: " + str + ". Actual: " + str2);
            }
        }
    }

    private SourceRecord createSourceRecord(FlowFile flowFile, byte[] bArr, Map<String, ?> map, Map<String, ?> map2) {
        String str;
        Schema schema = (bArr == null || bArr.length == 0) ? null : Schema.BYTES_SCHEMA;
        if (this.topicNameAttribute == null) {
            str = this.topicName;
        } else {
            String attribute = flowFile.getAttribute(this.topicNameAttribute);
            str = attribute == null ? this.topicName : attribute;
        }
        ConnectHeaders connectHeaders = new ConnectHeaders();
        if (this.headerAttributeNamePattern != null) {
            for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
                if (this.headerAttributeNamePattern.matcher((CharSequence) entry.getKey()).matches()) {
                    connectHeaders.add((String) entry.getKey(), (String) entry.getValue(), Schema.STRING_SCHEMA);
                }
            }
        }
        String attribute2 = this.keyAttributeName == null ? null : flowFile.getAttribute(this.keyAttributeName);
        return new SourceRecord(map2, map, str, (Integer) null, attribute2 == null ? null : Schema.STRING_SCHEMA, attribute2, schema, bArr, Long.valueOf(System.currentTimeMillis()), connectHeaders);
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        super.commitRecord(sourceRecord, recordMetadata);
        long decrementAndGet = this.unacknowledgedRecords.decrementAndGet();
        logger.debug("SourceRecord {} committed; number of unacknowledged FlowFiles is now {}", sourceRecord, Long.valueOf(decrementAndGet));
        if (decrementAndGet < 1) {
            logger.debug("Acknowledging trigger result");
            this.triggerResult.acknowledge();
        }
    }

    public void stop() {
        logger.info("Shutting down Source Task for " + this.dataflowName);
        if (this.dataflow != null) {
            this.dataflow.shutdown();
        }
    }

    protected StatelessDataflow getDataflow() {
        return this.dataflow;
    }
}
