/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.util.StopWatch;

@Tags(value={"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription(value="Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. Note: Please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@WritesAttributes(value={@WritesAttribute(attribute="eventhub.enqueued.timestamp", description="The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @WritesAttribute(attribute="eventhub.offset", description="The offset into the partition at which the message was stored"), @WritesAttribute(attribute="eventhub.sequence", description="The Azure sequence number associated with the message"), @WritesAttribute(attribute="eventhub.name", description="The name of the event hub from which the message was pulled"), @WritesAttribute(attribute="eventhub.partition", description="The name of the event hub partition from which the message was pulled")})
public class GetAzureEventHub
extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the event hub to pull messages from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder().name("Service Bus Endpoint").description("To support namespaces in non-standard Host URIs ( not .servicebus.windows.net,  ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{".servicebus.windows.net", ".servicebus.chinacloudapi.cn"}).defaultValue(".servicebus.windows.net").required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Listen claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder().name("Number of Event Hub Partitions").description("The number of partitions that the event hub has. Only this number of partitions will be used, so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("Event Hub Consumer Group").displayName("Consumer Group").description("The name of the consumer group to use when pulling events").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder().name("Event Hub Message Enqueue Time").displayName("Message Enqueue Time").description("A timestamp (ISO-8601 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages should have been enqueued in the Event Hub to start reading from").addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder().name("Partition Recivier Fetch Size").displayName("Partition Receiver Fetch Size").description("The number of events that a receiver should fetch from an Event Hubs partition before returning. Default(100)").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder().name("Partiton Receiver Timeout (millseconds)").name("Partition Receiver Timeout (millseconds)").description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)").addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.").build();
    private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<String, PartitionReceiver>();
    private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<String>();
    private volatile Instant configuredEnqueueTime;
    private volatile int receiverFetchSize;
    private volatile Duration receiverFetchTimeout;
    private EventHubClient eventHubClient;
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private ScheduledExecutorService executor;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
        return retVal;
    }

    protected void setupReceiver(String connectionString, ScheduledExecutorService executor) throws ProcessException {
        try {
            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
            this.eventHubClient = EventHubClient.createFromConnectionStringSync((String)connectionString, (ScheduledExecutorService)executor);
        }
        catch (EventHubException | IOException e) {
            throw new ProcessException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    PartitionReceiver getReceiver(ProcessContext context, String partitionId) throws IOException, EventHubException, ExecutionException, InterruptedException {
        PartitionReceiver existingReceiver = (PartitionReceiver)this.partitionToReceiverMap.get(partitionId);
        if (existingReceiver != null) {
            return existingReceiver;
        }
        GetAzureEventHub getAzureEventHub = this;
        synchronized (getAzureEventHub) {
            existingReceiver = (PartitionReceiver)this.partitionToReceiverMap.get(partitionId);
            if (existingReceiver != null) {
                return existingReceiver;
            }
            String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
            PartitionReceiver receiver = (PartitionReceiver)this.eventHubClient.createReceiver(consumerGroupName, partitionId, EventPosition.fromEnqueuedTime((Instant)(this.configuredEnqueueTime == null ? Instant.now() : this.configuredEnqueueTime))).get();
            receiver.setReceiveTimeout(this.receiverFetchTimeout == null ? Duration.ofMillis(60000L) : this.receiverFetchTimeout);
            this.partitionToReceiverMap.put(partitionId, receiver);
            return receiver;
        }
    }

    protected Iterable<EventData> receiveEvents(ProcessContext context, String partitionId) throws ProcessException {
        try {
            PartitionReceiver receiver = this.getReceiver(context, partitionId);
            return (Iterable)receiver.receive(this.receiverFetchSize).get();
        }
        catch (EventHubException | IOException | InterruptedException | ExecutionException e) {
            throw new ProcessException(e);
        }
    }

    @OnStopped
    public void tearDown() throws ProcessException {
        for (PartitionReceiver receiver : this.partitionToReceiverMap.values()) {
            if (null == receiver) continue;
            receiver.close();
        }
        this.partitionToReceiverMap.clear();
        try {
            if (null != this.eventHubClient) {
                this.eventHubClient.closeSync();
            }
            this.executor.shutdown();
        }
        catch (EventHubException e) {
            throw new ProcessException((Throwable)e);
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws ProcessException, URISyntaxException {
        String connectionString;
        LinkedBlockingQueue<String> partitionNames = new LinkedBlockingQueue<String>();
        for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); ++i) {
            partitionNames.add(String.valueOf(i));
        }
        this.partitionNames = partitionNames;
        String namespace = context.getProperty(NAMESPACE).getValue();
        String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
        String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
        if (useManagedIdentity) {
            connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
        } else {
            String policyName = context.getProperty(ACCESS_POLICY).getValue();
            String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
            connectionString = new ConnectionStringBuilder().setEndpoint(new URI("amqps://" + namespace + serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
        }
        this.configuredEnqueueTime = context.getProperty(ENQUEUE_TIME).isSet() ? Instant.parse(context.getProperty(ENQUEUE_TIME).toString()) : null;
        this.receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).isSet() ? context.getProperty(RECEIVER_FETCH_SIZE).asInteger() : 100;
        this.receiverFetchTimeout = context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet() ? Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong()) : null;
        this.executor = Executors.newScheduledThreadPool(4);
        this.setupReceiver(connectionString, this.executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        BlockingQueue<String> partitionIds = this.partitionNames;
        String partitionId = (String)partitionIds.poll();
        if (partitionId == null) {
            this.getLogger().debug("No partitions available");
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        try {
            Iterable<EventData> receivedEvents = this.receiveEvents(context, partitionId);
            if (receivedEvents == null) {
                return;
            }
            for (EventData eventData : receivedEvents) {
                if (null == eventData) continue;
                HashMap<String, String> attributes = new HashMap<String, String>();
                FlowFile flowFile = session.create();
                EventData.SystemProperties systemProperties = eventData.getSystemProperties();
                if (null != systemProperties) {
                    attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
                    attributes.put("eventhub.offset", systemProperties.getOffset());
                    attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
                }
                attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
                attributes.put("eventhub.partition", partitionId);
                flowFile = session.putAllAttributes(flowFile, attributes);
                flowFile = session.write(flowFile, out -> out.write(eventData.getBytes()));
                session.transfer(flowFile, REL_SUCCESS);
                String namespace = context.getProperty(NAMESPACE).getValue();
                String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
                String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
                String serviceBusEndPoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
                String transitUri = "amqps://" + namespace + serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
                session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            }
        }
        finally {
            partitionIds.offer(partitionId);
        }
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(EVENT_HUB_NAME);
        _propertyDescriptors.add(SERVICE_BUS_ENDPOINT);
        _propertyDescriptors.add(NAMESPACE);
        _propertyDescriptors.add(ACCESS_POLICY);
        _propertyDescriptors.add(POLICY_PRIMARY_KEY);
        _propertyDescriptors.add(USE_MANAGED_IDENTITY);
        _propertyDescriptors.add(NUM_PARTITIONS);
        _propertyDescriptors.add(CONSUMER_GROUP);
        _propertyDescriptors.add(ENQUEUE_TIME);
        _propertyDescriptors.add(RECEIVER_FETCH_SIZE);
        _propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

