package org.apache.nifi.processors.azure.eventhub;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Send FlowFile contents to Azure Event Hubs")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The Processor buffers FlowFile contents in memory before sending")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.class */
public class PutAzureEventHub extends AbstractProcessor implements AzureEventHubComponent {
    private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s";
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("Name of Azure Event Hubs destination").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("Namespace of Azure Event Hubs prefixed to Service Bus Endpoint domain").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Send claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("partitioning-key-attribute-name").displayName("Partitioning Key Attribute Name").description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).build();
    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("max-batch-size").displayName("Maximum Batch Size").description("Maximum number of FlowFiles processed for each Processor invocation").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the event hubs will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the event hub will be transferred to this Relationship.").build();
    private static final List<PropertyDescriptor> PROPERTIES = List.of(NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, TRANSPORT_TYPE, ACCESS_POLICY, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, PARTITIONING_KEY_ATTRIBUTE_NAME, MAX_BATCH_SIZE, PROXY_CONFIGURATION_SERVICE);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private EventHubProducerClient eventHubProducerClient;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    @OnScheduled
    public final void createClient(ProcessContext processContext) {
        this.eventHubProducerClient = createEventHubProducerClient(processContext);
    }

    @OnStopped
    public void closeClient() {
        if (this.eventHubProducerClient == null) {
            getLogger().info("Azure Event Hub Producer Client not configured");
        } else {
            this.eventHubProducerClient.close();
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, validationContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        StopWatch stopWatch = new StopWatch(true);
        String value = processContext.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
        List list = processSession.get(processContext.getProperty(MAX_BATCH_SIZE).asInteger().intValue());
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(handleFlowFile((FlowFile) it.next(), value, processSession));
        }
        processFlowFileResults(processContext, processSession, stopWatch, arrayList);
    }

    protected EventHubProducerClient createEventHubProducerClient(ProcessContext processContext) throws ProcessException {
        boolean booleanValue = processContext.getProperty(USE_MANAGED_IDENTITY).asBoolean().booleanValue();
        String value = processContext.getProperty(NAMESPACE).getValue();
        String value2 = processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        String value3 = processContext.getProperty(EVENT_HUB_NAME).getValue();
        AmqpTransportType asAmqpTransportType = ((AzureEventHubTransportType) processContext.getProperty(TRANSPORT_TYPE).asAllowableValue(AzureEventHubTransportType.class)).asAmqpTransportType();
        try {
            EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
            eventHubClientBuilder.transportType(asAmqpTransportType);
            String format = String.format("%s%s", value, value2);
            if (booleanValue) {
                eventHubClientBuilder.credential(format, value3, new ManagedIdentityCredentialBuilder().build());
            } else {
                eventHubClientBuilder.credential(format, value3, new AzureNamedKeyCredential(processContext.getProperty(ACCESS_POLICY).getValue(), processContext.getProperty(POLICY_PRIMARY_KEY).getValue()));
            }
            Optional<ProxyOptions> proxyOptions = AzureEventHubUtils.getProxyOptions(processContext);
            Objects.requireNonNull(eventHubClientBuilder);
            proxyOptions.ifPresent(eventHubClientBuilder::proxyOptions);
            return eventHubClientBuilder.buildProducerClient();
        } catch (Exception e) {
            throw new ProcessException("EventHubClient creation failed", e);
        }
    }

    private void processFlowFileResults(ProcessContext processContext, ProcessSession processSession, StopWatch stopWatch, List<FlowFileResultCarrier<Relationship>> list) {
        try {
            for (FlowFileResultCarrier<Relationship> flowFileResultCarrier : list) {
                FlowFile flowFile = flowFileResultCarrier.flowFile();
                if (flowFileResultCarrier.result() == REL_SUCCESS) {
                    processSession.getProvenanceReporter().send(flowFile, String.format(TRANSIT_URI_FORMAT_STRING, processContext.getProperty(NAMESPACE).getValue(), processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue(), processContext.getProperty(EVENT_HUB_NAME).getValue()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                    processSession.transfer(flowFile, REL_SUCCESS);
                } else {
                    getLogger().error("Send failed {}", new Object[]{flowFile, flowFileResultCarrier.exception()});
                    processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                }
            }
        } catch (Exception e) {
            processSession.rollback();
            getLogger().error("FlowFile Batch Size [{}] processing failed", new Object[]{Integer.valueOf(list.size())});
        }
    }

    private FlowFileResultCarrier<Relationship> handleFlowFile(FlowFile flowFile, String str, ProcessSession processSession) {
        byte[] bArr = new byte[(int) flowFile.getSize()];
        processSession.read(flowFile, inputStream -> {
            StreamUtils.fillBuffer(inputStream, bArr);
        });
        String attribute = StringUtils.isNotBlank(str) ? flowFile.getAttribute(str) : null;
        Map<String, ?> attributes = flowFile.getAttributes();
        try {
            sendMessage(bArr, attribute, attributes == null ? Collections.emptyMap() : attributes);
            return new FlowFileResultCarrier<>(flowFile, REL_SUCCESS);
        } catch (Exception e) {
            return new FlowFileResultCarrier<>(flowFile, REL_FAILURE, e);
        }
    }

    private void sendMessage(byte[] bArr, String str, Map<String, ?> map) {
        EventData eventData = new EventData(bArr);
        eventData.getProperties().putAll(map);
        SendOptions sendOptions = new SendOptions();
        if (StringUtils.isNotBlank(str)) {
            sendOptions.setPartitionKey(str);
        }
        this.eventHubProducerClient.send(Collections.singleton(eventData), sendOptions);
    }
}
