package org.apache.nifi.processors.azure.eventhub;

import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs without reliable checkpoint tracking. In clustered environment, GetAzureEventHub processor instances work independently and all cluster nodes process all messages (unless running the processor in Primary Only mode). ConsumeAzureEventHub offers the recommended approach to receiving messages from Azure Event Hubs. This processor creates a thread pool for connections to Azure Event Hubs.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@WritesAttributes({@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled"), @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")})
@SeeAlso({ConsumeAzureEventHub.class})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.class */
public class GetAzureEventHub extends AbstractProcessor implements AzureEventHubComponent {
    private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s/%s/ConsumerGroups/%s/Partitions/%s";
    private static final int DEFAULT_FETCH_SIZE = 100;
    private static final String NODE_CLIENT_IDENTIFIER_FORMAT = "%s-%s";
    private final Map<String, EventPosition> partitionEventPositions = new ConcurrentHashMap();
    private final BlockingQueue<String> partitionIds = new LinkedBlockingQueue();
    private final AtomicReference<ExecutionNode> configuredExecutionNode = new AtomicReference<>(ExecutionNode.ALL);
    private volatile int receiverFetchSize;
    private volatile Duration receiverFetchTimeout;
    private EventHubClientBuilder configuredClientBuilder;
    private EventHubConsumerClient eventHubConsumerClient;
    private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60);
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("Name of Azure Event Hubs source").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("Namespace of Azure Event Hubs prefixed to Service Bus Endpoint domain").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Listen claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;

    @Deprecated
    static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder().name("Number of Event Hub Partitions").description("This property is deprecated and no longer used.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("Event Hub Consumer Group").displayName("Consumer Group").description("The name of the consumer group to use when pulling events").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder().name("Event Hub Message Enqueue Time").displayName("Message Enqueue Time").description("A timestamp (ISO-8601 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages should have been enqueued in the Event Hub to start reading from").addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder().name("Partition Recivier Fetch Size").displayName("Partition Receiver Fetch Size").description("The number of events that a receiver should fetch from an Event Hubs partition before returning. The default is 100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder().name("Partition Receiver Timeout (millseconds)").displayName("Partition Receiver Timeout").description("The amount of time in milliseconds a Partition Receiver should wait to receive the Fetch Size before returning. The default is " + DEFAULT_FETCH_TIMEOUT.toMillis()).addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.").build();
    private static final List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, TRANSPORT_TYPE, ACCESS_POLICY, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, NUM_PARTITIONS, CONSUMER_GROUP, ENQUEUE_TIME, RECEIVER_FETCH_SIZE, RECEIVER_FETCH_TIMEOUT));
    private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, validationContext);
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeStateChange(PrimaryNodeState primaryNodeState) {
        ExecutionNode executionNode = this.configuredExecutionNode.get();
        if (executionNode != ExecutionNode.PRIMARY) {
            getLogger().debug("Consumer Client not changed based on Execution Node [{}]", new Object[]{executionNode});
        } else if (PrimaryNodeState.PRIMARY_NODE_REVOKED == primaryNodeState) {
            closeClient();
            getLogger().info("Consumer Client closed based on Execution Node [{}] and Primary Node State [{}]", new Object[]{executionNode, primaryNodeState});
        } else {
            createClient();
            getLogger().info("Consumer Client created based on Execution Node [{}] and Primary Node State [{}]", new Object[]{executionNode, primaryNodeState});
        }
    }

    @OnStopped
    public void closeClient() {
        this.partitionIds.clear();
        this.partitionEventPositions.clear();
        if (this.eventHubConsumerClient == null) {
            getLogger().debug("Consumer Client not configured");
        } else {
            this.eventHubConsumerClient.close();
            getLogger().info("Consumer Client for Event Hub [{}] closed", new Object[]{this.eventHubConsumerClient.getEventHubName()});
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.configuredExecutionNode.set(processContext.getExecutionNode());
        this.configuredClientBuilder = createEventHubClientBuilder(processContext);
        createClient();
        if (processContext.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
            this.receiverFetchSize = processContext.getProperty(RECEIVER_FETCH_SIZE).asInteger().intValue();
        } else {
            this.receiverFetchSize = DEFAULT_FETCH_SIZE;
        }
        if (processContext.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
            this.receiverFetchTimeout = Duration.ofMillis(processContext.getProperty(RECEIVER_FETCH_TIMEOUT).asLong().longValue());
        } else {
            this.receiverFetchTimeout = DEFAULT_FETCH_TIMEOUT;
        }
        PropertyValue property = processContext.getProperty(ENQUEUE_TIME);
        EventPosition fromEnqueuedTime = EventPosition.fromEnqueuedTime(property.isSet() ? Instant.parse(property.getValue()) : Instant.now());
        Iterator it = this.partitionIds.iterator();
        while (it.hasNext()) {
            this.partitionEventPositions.put((String) it.next(), fromEnqueuedTime);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        String poll = this.partitionIds.poll();
        if (poll == null) {
            getLogger().debug("No partitions available");
            return;
        }
        Long l = null;
        StopWatch stopWatch = new StopWatch(true);
        try {
            for (PartitionEvent partitionEvent : receiveEvents(poll)) {
                FlowFile putAllAttributes = processSession.putAllAttributes(processSession.create(), getAttributes(partitionEvent));
                EventData data = partitionEvent.getData();
                byte[] body = data.getBody();
                FlowFile write = processSession.write(putAllAttributes, outputStream -> {
                    outputStream.write(body);
                });
                processSession.transfer(write, REL_SUCCESS);
                processSession.getProvenanceReporter().receive(write, getTransitUri(poll), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                l = data.getSequenceNumber();
            }
            if (l == null) {
                getLogger().debug("Partition [{}] Event Position not updated: Last Sequence Number not found", new Object[]{poll});
            } else {
                this.partitionEventPositions.put(poll, EventPosition.fromSequenceNumber(l.longValue()));
                getLogger().debug("Partition [{}] Event Position updated: Sequence Number [{}]", new Object[]{poll, l});
            }
        } finally {
            this.partitionIds.offer(poll);
        }
    }

    protected BlockingQueue<String> getPartitionIds() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Iterator it = this.eventHubConsumerClient.getPartitionIds().iterator();
        while (it.hasNext()) {
            linkedBlockingQueue.add((String) it.next());
        }
        return linkedBlockingQueue;
    }

    protected synchronized Iterable<PartitionEvent> receiveEvents(String str) {
        EventPosition orDefault = this.partitionEventPositions.getOrDefault(str, EventPosition.fromEnqueuedTime(Instant.now()));
        getLogger().debug("Receiving Events for Partition [{}] from Position [{}]", new Object[]{str, orDefault});
        return this.eventHubConsumerClient.receiveFromPartition(str, this.receiverFetchSize, orDefault, this.receiverFetchTimeout);
    }

    private void createClient() {
        if (isCreateClientEnabled()) {
            closeClient();
            this.eventHubConsumerClient = this.configuredClientBuilder.buildConsumerClient();
            this.partitionIds.addAll(getPartitionIds());
            getLogger().info("Consumer Client created for Event Hub [{}] Partitions {}", new Object[]{this.eventHubConsumerClient.getEventHubName(), this.partitionIds});
        }
    }

    private boolean isCreateClientEnabled() {
        return ExecutionNode.PRIMARY == this.configuredExecutionNode.get() ? getNodeTypeProvider().isPrimary() : true;
    }

    private EventHubClientBuilder createEventHubClientBuilder(ProcessContext processContext) {
        String value = processContext.getProperty(NAMESPACE).getValue();
        String value2 = processContext.getProperty(EVENT_HUB_NAME).getValue();
        String value3 = processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        boolean booleanValue = processContext.getProperty(USE_MANAGED_IDENTITY).asBoolean().booleanValue();
        String format = String.format("%s%s", value, value3);
        AmqpTransportType fromString = AmqpTransportType.fromString(processContext.getProperty(TRANSPORT_TYPE).getValue());
        EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
        eventHubClientBuilder.transportType(fromString);
        eventHubClientBuilder.consumerGroup(processContext.getProperty(CONSUMER_GROUP).getValue());
        if (booleanValue) {
            eventHubClientBuilder.credential(format, value2, new ManagedIdentityCredentialBuilder().build());
        } else {
            eventHubClientBuilder.credential(format, value2, new AzureNamedKeyCredential(processContext.getProperty(ACCESS_POLICY).getValue(), processContext.getProperty(POLICY_PRIMARY_KEY).getValue()));
        }
        AmqpClientOptions amqpClientOptions = new AmqpClientOptions();
        amqpClientOptions.setIdentifier(getClientIdentifier());
        eventHubClientBuilder.clientOptions(amqpClientOptions);
        return eventHubClientBuilder;
    }

    private String getTransitUri(String str) {
        return String.format(TRANSIT_URI_FORMAT_STRING, this.eventHubConsumerClient.getFullyQualifiedNamespace(), this.eventHubConsumerClient.getEventHubName(), this.eventHubConsumerClient.getConsumerGroup(), str);
    }

    private String getClientIdentifier() {
        String str;
        String identifier = getIdentifier();
        NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
        if (nodeTypeProvider.isClustered()) {
            Optional currentNode = nodeTypeProvider.getCurrentNode();
            str = currentNode.isPresent() ? String.format(NODE_CLIENT_IDENTIFIER_FORMAT, (String) currentNode.get(), identifier) : identifier;
        } else {
            str = identifier;
        }
        return str;
    }

    private Map<String, String> getAttributes(PartitionEvent partitionEvent) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        EventData data = partitionEvent.getData();
        linkedHashMap.put("eventhub.enqueued.timestamp", String.valueOf(data.getEnqueuedTime()));
        linkedHashMap.put("eventhub.offset", String.valueOf(data.getOffset()));
        linkedHashMap.put("eventhub.sequence", String.valueOf(data.getSequenceNumber()));
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        linkedHashMap.put("eventhub.name", partitionContext.getEventHubName());
        linkedHashMap.put("eventhub.partition", partitionContext.getPartitionId());
        linkedHashMap.putAll(AzureEventHubUtils.getApplicationProperties(data.getProperties()));
        return linkedHashMap;
    }
}
