package org.apache.nifi.services.azure.eventhub;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;

@CapabilityDescription("Format and send Records to Azure Event Hubs")
@Tags({"azure", "record", "sink"})
/* loaded from: input_file:org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.class */
public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService, AzureEventHubComponent {
    static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net", "Azure", "Default Service Bus Endpoint");
    static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "China Service Bus Endpoint");
    static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Germany Service Bus Endpoint");
    static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "United States Government Endpoint");
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder().name("Service Bus Endpoint").description("Provides the domain for connecting to Azure Event Hubs").addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new DescribedValue[]{AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT, AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT}).defaultValue(AZURE_ENDPOINT).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("Provides provides the host for connecting to Azure Event Hubs").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("Provides the Event Hub Name for connections").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder().name("Authentication Strategy").description("Strategy for authenticating to Azure Event Hubs").addValidator(StandardValidators.NON_BLANK_VALIDATOR).allowableValues(AzureAuthenticationStrategy.class).required(true).defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL).build();
    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy").description("The name of the shared access policy. This policy must have Send claims").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY, new DescribedValue[0]).build();
    static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Key").description("The primary or secondary key of the shared access policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).required(false).dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY, new DescribedValue[0]).build();
    static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder().name("Partition Key").description("A hint for Azure Event Hub message broker how to distribute messages across one or more partitions").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(SERVICE_BUS_ENDPOINT, EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, TRANSPORT_TYPE, RECORD_WRITER_FACTORY, AUTHENTICATION_STRATEGY, SHARED_ACCESS_POLICY, SHARED_ACCESS_POLICY_KEY, PARTITION_KEY);
    private volatile ConfigurationContext context;
    private RecordSetWriterFactory writerFactory;
    private EventHubProducerClient client;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected EventHubProducerClient createEventHubClient(String str, String str2, String str3, String str4, String str5, AzureAuthenticationStrategy azureAuthenticationStrategy, AmqpTransportType amqpTransportType) {
        String format = String.format("%s%s", str, str2);
        EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
        eventHubClientBuilder.transportType(amqpTransportType);
        if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == azureAuthenticationStrategy) {
            eventHubClientBuilder.credential(format, str3, new AzureNamedKeyCredential(str4, str5));
        } else {
            eventHubClientBuilder.credential(format, str3, new DefaultAzureCredentialBuilder().build());
        }
        return eventHubClientBuilder.buildProducerClient();
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) {
        this.context = configurationContext;
        this.writerFactory = configurationContext.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        this.client = createEventHubClient(configurationContext.getProperty(EVENT_HUB_NAMESPACE).evaluateAttributeExpressions().getValue(), configurationContext.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue(), configurationContext.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(), configurationContext.getProperty(SHARED_ACCESS_POLICY).getValue(), configurationContext.getProperty(SHARED_ACCESS_POLICY_KEY).getValue(), (AzureAuthenticationStrategy) configurationContext.getProperty(AUTHENTICATION_STRATEGY).asAllowableValue(AzureAuthenticationStrategy.class), ((AzureEventHubTransportType) configurationContext.getProperty(TRANSPORT_TYPE).asAllowableValue(AzureEventHubTransportType.class)).asAmqpTransportType());
    }

    @OnDisabled
    public void onDisabled() {
        if (this.client == null) {
            getLogger().debug("Event Hub Client not configured");
        } else {
            this.client.close();
        }
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> map, boolean z) throws IOException {
        LinkedHashMap linkedHashMap = new LinkedHashMap(map);
        String value = this.context.getProperty(PARTITION_KEY).evaluateAttributeExpressions(map).getValue();
        CreateBatchOptions createBatchOptions = new CreateBatchOptions();
        createBatchOptions.setPartitionKey(value);
        EventDataBatch createBatch = this.client.createBatch(createBatchOptions);
        String str = (String) linkedHashMap.get(CoreAttributes.UUID.key());
        int i = 0;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                RecordSetWriter createWriter = this.writerFactory.createWriter(getLogger(), recordSet.getSchema(), byteArrayOutputStream, map);
                try {
                    String mimeType = createWriter.getMimeType();
                    while (true) {
                        Record next = recordSet.next();
                        if (next == null) {
                            if (createBatch.getCount() > 0) {
                                this.client.send(createBatch);
                            }
                            if (createWriter != null) {
                                createWriter.close();
                            }
                            byteArrayOutputStream.close();
                            return WriteResult.of(i, linkedHashMap);
                        }
                        createWriter.write(next);
                        createWriter.flush();
                        i++;
                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                        byteArrayOutputStream.reset();
                        EventData eventData = new EventData(byteArray);
                        eventData.getProperties().putAll(linkedHashMap);
                        eventData.setContentType(mimeType);
                        eventData.setCorrelationId(str);
                        eventData.setMessageId(String.format("%s-%d", str, Integer.valueOf(i)));
                        if (!createBatch.tryAdd(eventData)) {
                            if (createBatch.getCount() > 0) {
                                this.client.send(createBatch);
                                createBatch = this.client.createBatch(createBatchOptions);
                            }
                            if (!createBatch.tryAdd(eventData)) {
                                throw new ProcessException("Record " + i + " exceeds maximum event data size: " + createBatch.getMaxSizeInBytes());
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (createWriter != null) {
                        try {
                            createWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (SchemaNotFoundException e) {
            throw new IOException("Record Schema not found", e);
        }
    }
}
