package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.eventhubs.client.ConnectionStringBuilder;
import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubMessage;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@WritesAttributes({@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.class */
public class GetAzureEventHub extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the Azure Event Hub to pull messages from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().name("Shared Access Policy Primary Key").description("The primary key of the Event Hub Shared Access Policy").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).sensitive(true).required(true).build();
    static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder().name("Number of Event Hub Partitions").description("The number of partitions that the Event Hub has. Only this number of partitions will be used, so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).required(true).build();
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("Event Hub Consumer Group").description("The name of the Event Hub Consumer Group to use when pulling events").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The number of FlowFiles to pull in a single JMS session").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(false).defaultValue("10").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.").build();
    private final ConcurrentMap<String, ResilientEventHubReceiver> partitionToReceiverMap = new ConcurrentHashMap();
    private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(EVENT_HUB_NAME);
        arrayList.add(NAMESPACE);
        arrayList.add(ACCESS_POLICY);
        arrayList.add(POLICY_PRIMARY_KEY);
        arrayList.add(NUM_PARTITIONS);
        arrayList.add(CONSUMER_GROUP);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    private ResilientEventHubReceiver getReceiver(ProcessContext processContext, String str) throws EventHubException {
        ResilientEventHubReceiver resilientEventHubReceiver = this.partitionToReceiverMap.get(str);
        if (resilientEventHubReceiver != null) {
            return resilientEventHubReceiver;
        }
        synchronized (this) {
            ResilientEventHubReceiver resilientEventHubReceiver2 = this.partitionToReceiverMap.get(str);
            if (resilientEventHubReceiver2 != null) {
                return resilientEventHubReceiver2;
            }
            String value = processContext.getProperty(ACCESS_POLICY).getValue();
            String value2 = processContext.getProperty(POLICY_PRIMARY_KEY).getValue();
            String value3 = processContext.getProperty(NAMESPACE).getValue();
            ResilientEventHubReceiver resilientEventHubReceiver3 = new ResilientEventHubReceiver(new ConnectionStringBuilder(value, value2, value3).getConnectionString(), processContext.getProperty(EVENT_HUB_NAME).getValue(), str, processContext.getProperty(CONSUMER_GROUP).getValue(), -1, new EventHubEnqueueTimeFilter(System.currentTimeMillis()));
            resilientEventHubReceiver3.initialize();
            this.partitionToReceiverMap.put(str, resilientEventHubReceiver3);
            return resilientEventHubReceiver3;
        }
    }

    @OnStopped
    public void tearDown() {
        Iterator<ResilientEventHubReceiver> it = this.partitionToReceiverMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.partitionToReceiverMap.clear();
    }

    @OnScheduled
    public void setupPartitions(ProcessContext processContext) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (int i = 0; i < processContext.getProperty(NUM_PARTITIONS).asInteger().intValue(); i++) {
            linkedBlockingQueue.add(String.valueOf(i));
        }
        this.partitionNames = linkedBlockingQueue;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        BlockingQueue<String> blockingQueue = this.partitionNames;
        String poll = blockingQueue.poll();
        if (poll == null) {
            getLogger().debug("No partitions available");
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        try {
            try {
                final EventHubMessage parseAmqpMessage = EventHubMessage.parseAmqpMessage(getReceiver(processContext, poll).receive(100L));
                if (parseAmqpMessage == null) {
                    return;
                }
                HashMap hashMap = new HashMap();
                hashMap.put("eventhub.enqueued.timestamp", String.valueOf(parseAmqpMessage.getEnqueuedTimestamp()));
                hashMap.put("eventhub.offset", parseAmqpMessage.getOffset());
                hashMap.put("eventhub.sequence", String.valueOf(parseAmqpMessage.getSequence()));
                hashMap.put("eventhub.name", processContext.getProperty(EVENT_HUB_NAME).getValue());
                hashMap.put("eventhub.partition", poll);
                FlowFile write = processSession.write(processSession.putAllAttributes(processSession.create(), hashMap), new OutputStreamCallback() { // from class: org.apache.nifi.processors.azure.eventhub.GetAzureEventHub.1
                    public void process(OutputStream outputStream) throws IOException {
                        outputStream.write(parseAmqpMessage.getData());
                    }
                });
                processSession.transfer(write, REL_SUCCESS);
                processSession.getProvenanceReporter().receive(write, "amqps://" + processContext.getProperty(NAMESPACE).getValue() + ".servicebus.windows.net/" + processContext.getProperty(EVENT_HUB_NAME).getValue() + "/ConsumerGroups/" + processContext.getProperty(CONSUMER_GROUP).getValue() + "/Partitions/" + poll, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                blockingQueue.offer(poll);
            } catch (EventHubException e) {
                throw new ProcessException(e);
            }
        } finally {
            blockingQueue.offer(poll);
        }
    }
}
