/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.eventhub;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

@SupportsBatching
@Tags(value={"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Sends the contents of a FlowFile to Windows Azure Event Hubs. Note: the content of the FlowFile will be buffered into memory before being sent, so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available. Also please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutAzureEventHub
extends AbstractProcessor {
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("Event Hub Name").description("The name of the event hub to send to").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).build();
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("Event Hub Namespace").description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder().name("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Send claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("partitioning-key-attribute-name").displayName("Partitioning Key Attribute Name").description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).defaultValue(null).build();
    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("max-batch-size").displayName("Maximum batch size").description("Maximum count of flow files being processed in one batch.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully sent to the event hubs will be transferred to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that could not be sent to the event hub will be transferred to this Relationship.").build();
    private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue<EventHubClient>();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private ScheduledExecutorService executor;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @OnScheduled
    public final void setupClient(ProcessContext context) throws ProcessException {
    }

    @OnStopped
    public void tearDown() {
        EventHubClient sender;
        while ((sender = (EventHubClient)this.senderQueue.poll()) != null) {
            sender.close();
        }
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
        return retVal;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        try {
            this.populateSenderQueue(context);
        }
        catch (ProcessException e) {
            context.yield();
            throw e;
        }
        StopWatch stopWatch = new StopWatch(true);
        String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
        int maxBatchSize = NumberUtils.toInt((String)context.getProperty(MAX_BATCH_SIZE).getValue(), (int)100);
        List flowFileList = session.get(maxBatchSize);
        LinkedBlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>>();
        for (FlowFile flowFile : flowFileList) {
            if (flowFile == null) continue;
            futureQueue.offer(this.handleFlowFile(flowFile, partitioningKeyAttributeName, session));
        }
        this.waitForAllFutures(context, session, stopWatch, futureQueue);
    }

    protected void waitForAllFutures(ProcessContext context, ProcessSession session, StopWatch stopWatch, BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue) {
        try {
            for (CompletableFuture completableFuture : futureQueue) {
                completableFuture.join();
                FlowFileResultCarrier flowFileResult = (FlowFileResultCarrier)completableFuture.get();
                if (flowFileResult == null) continue;
                FlowFile flowFile = flowFileResult.getFlowFile();
                if (flowFileResult.getResult() == REL_SUCCESS) {
                    String namespace = context.getProperty(NAMESPACE).getValue();
                    String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
                    session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                    session.transfer(flowFile, REL_SUCCESS);
                    continue;
                }
                Throwable processException = flowFileResult.getException();
                this.getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
                session.transfer(session.penalize(flowFile), REL_FAILURE);
            }
        }
        catch (InterruptedException | CancellationException | CompletionException | ExecutionException e) {
            this.getLogger().error("Batch processing failed", (Throwable)e);
            session.rollback();
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new ProcessException("Batch processing failed", (Throwable)e);
        }
    }

    protected CompletableFuture<FlowFileResultCarrier<Relationship>> handleFlowFile(FlowFile flowFile, String partitioningKeyAttributeName, ProcessSession session) {
        byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])buffer));
        String partitioningKey = StringUtils.isNotBlank((CharSequence)partitioningKeyAttributeName) ? flowFile.getAttribute(partitioningKeyAttributeName) : null;
        Map attributes = flowFile.getAttributes();
        Map<String, Object> userProperties = attributes == null ? Collections.emptyMap() : new HashMap(attributes);
        try {
            return ((CompletableFuture)this.sendMessage(buffer, partitioningKey, userProperties).thenApplyAsync(param -> new FlowFileResultCarrier<Relationship>(flowFile, REL_SUCCESS))).exceptionally(processException -> new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, (Throwable)processException));
        }
        catch (ProcessException processException2) {
            return CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException2));
        }
    }

    protected void populateSenderQueue(ProcessContext context) {
        if (this.senderQueue.size() == 0) {
            String policyKey;
            String policyName;
            int numThreads = context.getMaxConcurrentTasks();
            this.senderQueue = new LinkedBlockingQueue<EventHubClient>(numThreads);
            this.executor = Executors.newScheduledThreadPool(4);
            boolean useManagedIdentiy = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
            if (useManagedIdentiy) {
                policyName = "Managed Identity";
                policyKey = null;
            } else {
                policyName = context.getProperty(ACCESS_POLICY).getValue();
                policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
            }
            String namespace = context.getProperty(NAMESPACE).getValue();
            String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
            for (int i = 0; i < numThreads; ++i) {
                EventHubClient client = this.createEventHubClient(namespace, eventHubName, policyName, policyKey, this.executor);
                if (null == client) continue;
                this.senderQueue.offer(client);
            }
        }
    }

    protected EventHubClient createEventHubClient(String namespace, String eventHubName, String policyName, String policyKey, ScheduledExecutorService executor) throws ProcessException {
        try {
            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
            String connectionString = policyName == "Managed Identity" ? AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName) : this.getConnectionString(namespace, eventHubName, policyName, policyKey);
            return EventHubClient.createFromConnectionStringSync((String)connectionString, (ScheduledExecutorService)executor);
        }
        catch (EventHubException | IllegalConnectionStringFormatException | IOException e) {
            this.getLogger().error("Failed to create EventHubClient due to {}", new Object[]{e.getMessage()}, e);
            throw new ProcessException(e);
        }
    }

    protected String getConnectionString(String namespace, String eventHubName, String policyName, String policyKey) {
        return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey);
    }

    protected CompletableFuture<Void> sendMessage(byte[] buffer, String partitioningKey, Map<String, Object> userProperties) throws ProcessException {
        EventHubClient sender = (EventHubClient)this.senderQueue.poll();
        if (sender == null) {
            throw new ProcessException("No EventHubClients are configured for sending");
        }
        EventData eventData = EventData.create((byte[])buffer);
        Map properties = eventData.getProperties();
        if (userProperties != null && properties != null) {
            properties.putAll(userProperties);
        }
        CompletableFuture eventFuture = StringUtils.isNotBlank((CharSequence)partitioningKey) ? sender.send(eventData, partitioningKey) : sender.send(eventData);
        this.senderQueue.offer(sender);
        return eventFuture;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(EVENT_HUB_NAME);
        _propertyDescriptors.add(NAMESPACE);
        _propertyDescriptors.add(ACCESS_POLICY);
        _propertyDescriptors.add(POLICY_PRIMARY_KEY);
        _propertyDescriptors.add(USE_MANAGED_IDENTITY);
        _propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
        _propertyDescriptors.add(MAX_BATCH_SIZE);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

