package org.apache.nifi.processors.azure.eventhub;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. Checkpoint tracking avoids consuming a message multiple times and enables reliable resumption of processing in the event of intermittent network failures. Checkpoint tracking requires external storage and provides the preferred approach to consuming messages from Azure Event Hubs. In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes (each message is processed on one cluster node only).")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@WritesAttributes({@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"), @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")})
/* loaded from: input_file:org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.class */
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implements AzureEventHubComponent {
    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s";
    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.%s/;SharedAccessSignature=%s";
    private static final Set<Relationship> RELATIONSHIPS;
    private static final Set<Relationship> RECORD_RELATIONSHIPS;
    private volatile ProcessSessionFactory processSessionFactory;
    private volatile EventProcessorClient eventProcessorClient;
    private volatile RecordReaderFactory readerFactory;
    private volatile RecordSetWriterFactory writerFactory;
    private volatile String namespaceName;
    private volatile String serviceBusEndpoint;
    private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$");
    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder().name("event-hub-namespace").displayName("Event Hub Namespace").description("The namespace that the Azure Event Hubs is assigned to. This is generally equal to <Event Hub Names>-ns.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder().name("event-hub-name").displayName("Event Hub Name").description("The name of the event hub to pull messages from.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
    static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder().name("event-hub-shared-access-policy-name").displayName("Shared Access Policy Name").description("The name of the shared access policy. This policy must have Listen claims.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).build();
    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder().fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY).name("event-hub-shared-access-policy-primary-key").build();
    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder().name("event-hub-consumer-group").displayName("Consumer Group").description("The name of the consumer group to use.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("$Default").required(true).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for reading received messages. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use for serializing Records to an output FlowFile. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema. If not specified, each message will create a FlowFile.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue("start-of-stream", "Start of stream", "Read from the oldest message retained in the stream.");
    static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue("end-of-stream", "End of stream", "Ignore old retained messages even if exist, start reading new ones from now.");
    static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder().name("event-hub-initial-offset").displayName("Initial Offset").description("Specify where to start receiving messages if offset is not stored in Azure Storage.").required(true).allowableValues(new AllowableValue[]{INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM}).defaultValue(INITIAL_OFFSET_END_OF_STREAM.getValue()).build();
    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder().name("event-hub-prefetch-count").displayName("Prefetch Count").defaultValue("The number of messages to fetch from the event hub before processing. This parameter affects throughput. The more prefetch count, the better throughput in general, but consumes more resources (RAM). NOTE: Even though the event hub client API provides this option, actual number of messages can be pre-fetched is depend on the Event Hubs server implementation. It is reported that only one event is received at a time in certain situation. https://github.com/Azure/azure-event-hubs-java/issues/125").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("300").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("event-hub-batch-size").displayName("Batch Size").description("The number of messages to process within a NiFi session. This parameter affects throughput and consistency. NiFi commits its session and Event Hubs checkpoints after processing this number of messages. If NiFi session is committed, but fails to create an Event Hubs checkpoint, then it is possible that the same messages will be received again. The higher number, the higher throughput, but possibly less consistent.").defaultValue("10").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder().name("event-hub-message-receive-timeout").displayName("Message Receive Timeout").description("The amount of time this consumer should wait to receive the Prefetch Count before returning.").defaultValue("1 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder().name(AzureStorageUtils.STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME).displayName("Storage Account Name").description("Name of the Azure Storage account to store event hub consumer group state.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).build();
    static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder().name(AzureStorageUtils.STORAGE_ACCOUNT_KEY_PROPERTY_DESCRIPTOR_NAME).displayName("Storage Account Key").description("The Azure Storage account key to store event hub consumer group state.").sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).build();
    static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder().name(AzureStorageUtils.STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME).displayName("Storage SAS Token").description("The Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.").sensitive(true).addValidator(StandardValidators.createRegexMatchingValidator(SAS_TOKEN_PATTERN, true, "Token must start with a ? character.")).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).build();
    static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder().name("storage-container-name").displayName("Storage Container Name").description("Name of the Azure Storage container to store the event hub consumer group state. If not specified, event hub name is used.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles received from Event Hub.").build();
    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message from event hub cannot be parsed using the configured Record Reader or failed to be written by the configured Record Writer, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, TRANSPORT_TYPE, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, STORAGE_CONTAINER_NAME, PROXY_CONFIGURATION_SERVICE));
    private volatile boolean isRecordReaderSet = false;
    private volatile boolean isRecordWriterSet = false;
    protected final Consumer<EventBatchContext> eventBatchProcessor = eventBatchContext -> {
        ProcessSession createSession = this.processSessionFactory.createSession();
        try {
            StopWatch stopWatch = new StopWatch(true);
            if (this.readerFactory == null || this.writerFactory == null) {
                writeFlowFiles(eventBatchContext, createSession, stopWatch);
            } else {
                writeRecords(eventBatchContext, createSession, stopWatch);
            }
            Objects.requireNonNull(eventBatchContext);
            createSession.commitAsync(eventBatchContext::updateCheckpoint);
        } catch (Exception e) {
            PartitionContext partitionContext = eventBatchContext.getPartitionContext();
            getLogger().error("Event Batch processing failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", new Object[]{partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), e});
            createSession.rollback();
        }
    };
    private final Consumer<ErrorContext> errorProcessor = errorContext -> {
        PartitionContext partitionContext = errorContext.getPartitionContext();
        AmqpException throwable = errorContext.getThrowable();
        if (throwable instanceof AmqpException) {
            AmqpException amqpException = throwable;
            if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN) {
                getLogger().info("Partition was stolen by another consumer instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]. {}", new Object[]{partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), amqpException.getMessage()});
                return;
            }
        }
        getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", new Object[]{partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), throwable});
    };

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public Set<Relationship> getRelationships() {
        return (this.isRecordReaderSet && this.isRecordWriterSet) ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        ControllerService asControllerService = validationContext.getProperty(RECORD_READER).asControllerService();
        ControllerService asControllerService2 = validationContext.getProperty(RECORD_WRITER).asControllerService();
        String value = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        String value2 = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
        if ((asControllerService != null && asControllerService2 == null) || (asControllerService == null && asControllerService2 != null)) {
            arrayList.add(new ValidationResult.Builder().subject("Record Reader and Writer").explanation(String.format("Both %s and %s should be set in order to write FlowFiles as Records.", RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName())).valid(false).build());
        }
        if (StringUtils.isBlank(value) && StringUtils.isBlank(value2)) {
            arrayList.add(new ValidationResult.Builder().subject(String.format("%s or %s", STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())).explanation(String.format("either %s or %s should be set.", STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())).valid(false).build());
        }
        if (StringUtils.isNotBlank(value) && StringUtils.isNotBlank(value2)) {
            arrayList.add(new ValidationResult.Builder().subject(String.format("%s or %s", STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())).explanation(String.format("%s and %s should not be set at the same time.", STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())).valid(false).build());
        }
        arrayList.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
        return arrayList;
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (RECORD_READER.equals(propertyDescriptor)) {
            this.isRecordReaderSet = StringUtils.isNotEmpty(str2);
        } else if (RECORD_WRITER.equals(propertyDescriptor)) {
            this.isRecordWriterSet = StringUtils.isNotEmpty(str2);
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        if (this.eventProcessorClient == null) {
            this.processSessionFactory = processSessionFactory;
            this.readerFactory = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            this.writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
            this.eventProcessorClient = createClient(processContext);
            this.eventProcessorClient.start();
        }
        processContext.yield();
    }

    @OnStopped
    public void stopClient() {
        if (this.eventProcessorClient != null) {
            try {
                this.eventProcessorClient.stop();
            } catch (Exception e) {
                getLogger().warn("Event Processor Client stop failed", e);
            }
            this.eventProcessorClient = null;
            this.processSessionFactory = null;
            this.readerFactory = null;
            this.writerFactory = null;
        }
    }

    protected EventProcessorClient createClient(ProcessContext processContext) {
        this.namespaceName = processContext.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
        String value = processContext.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
        String value2 = processContext.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
        BlobContainerAsyncClient buildAsyncClient = new BlobContainerClientBuilder().connectionString(createStorageConnectionString(processContext)).containerName((String) org.apache.commons.lang3.StringUtils.defaultIfBlank(processContext.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), value)).buildAsyncClient();
        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(buildAsyncClient);
        EventProcessorClientBuilder processEventBatch = new EventProcessorClientBuilder().transportType(AmqpTransportType.fromString(processContext.getProperty(TRANSPORT_TYPE).getValue())).consumerGroup(value2).trackLastEnqueuedEventProperties(true).checkpointStore(blobCheckpointStore).processError(this.errorProcessor).processEventBatch(this.eventBatchProcessor, processContext.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger().intValue(), Duration.ofMillis(processContext.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
        String format = String.format("%s%s", this.namespaceName, this.serviceBusEndpoint);
        if (processContext.getProperty(USE_MANAGED_IDENTITY).asBoolean().booleanValue()) {
            processEventBatch.credential(format, value, new ManagedIdentityCredentialBuilder().build());
        } else {
            processEventBatch.credential(format, value, new AzureNamedKeyCredential(processContext.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(), processContext.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue()));
        }
        Integer asInteger = processContext.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
        if (asInteger != null && asInteger.intValue() > 0) {
            processEventBatch.prefetchCount(asInteger.intValue());
        }
        Map<String, EventPosition> legacyPartitionEventPosition = getLegacyPartitionEventPosition(buildAsyncClient, value2);
        if (!legacyPartitionEventPosition.isEmpty()) {
            processEventBatch.initialPartitionEventPosition(legacyPartitionEventPosition);
        } else if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(processContext.getProperty(INITIAL_OFFSET).getValue())) {
            processEventBatch.initialPartitionEventPosition(new EarliestEventPositionProvider().getInitialPartitionEventPosition());
        }
        Optional<ProxyOptions> proxyOptions = AzureEventHubUtils.getProxyOptions(processContext);
        Objects.requireNonNull(processEventBatch);
        proxyOptions.ifPresent(processEventBatch::proxyOptions);
        return processEventBatch.buildEventProcessorClient();
    }

    protected String getTransitUri(PartitionContext partitionContext) {
        return String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s", this.namespaceName, this.serviceBusEndpoint, partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId());
    }

    private void putEventHubAttributes(Map<String, String> map, PartitionContext partitionContext, EventData eventData, LastEnqueuedEventProperties lastEnqueuedEventProperties) {
        if (lastEnqueuedEventProperties != null) {
            map.put("eventhub.enqueued.timestamp", String.valueOf(lastEnqueuedEventProperties.getEnqueuedTime()));
            map.put("eventhub.offset", String.valueOf(lastEnqueuedEventProperties.getOffset()));
            map.put("eventhub.sequence", String.valueOf(lastEnqueuedEventProperties.getSequenceNumber()));
        }
        map.putAll(AzureEventHubUtils.getApplicationProperties(eventData.getProperties()));
        map.put("eventhub.name", partitionContext.getEventHubName());
        map.put("eventhub.partition", partitionContext.getPartitionId());
    }

    private void writeFlowFiles(EventBatchContext eventBatchContext, ProcessSession processSession, StopWatch stopWatch) {
        PartitionContext partitionContext = eventBatchContext.getPartitionContext();
        eventBatchContext.getEvents().forEach(eventData -> {
            HashMap hashMap = new HashMap();
            putEventHubAttributes(hashMap, partitionContext, eventData, eventBatchContext.getLastEnqueuedEventProperties());
            FlowFile putAllAttributes = processSession.putAllAttributes(processSession.create(), hashMap);
            byte[] body = eventData.getBody();
            transferTo(REL_SUCCESS, processSession, stopWatch, partitionContext, processSession.write(putAllAttributes, outputStream -> {
                outputStream.write(body);
            }));
        });
    }

    private void writeRecords(EventBatchContext eventBatchContext, ProcessSession processSession, StopWatch stopWatch) throws IOException {
        PartitionContext partitionContext = eventBatchContext.getPartitionContext();
        HashMap hashMap = new HashMap();
        hashMap.put("eventhub.name", partitionContext.getEventHubName());
        ComponentLog logger = getLogger();
        FlowFile create = processSession.create();
        HashMap hashMap2 = new HashMap();
        RecordSetWriter recordSetWriter = null;
        EventData eventData = null;
        WriteResult writeResult = null;
        int i = 0;
        LastEnqueuedEventProperties lastEnqueuedEventProperties = eventBatchContext.getLastEnqueuedEventProperties();
        List<EventData> events = eventBatchContext.getEvents();
        OutputStream write = processSession.write(create);
        try {
            for (EventData eventData2 : events) {
                try {
                    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(eventData2.getBody());
                    try {
                        RecordReader createRecordReader = this.readerFactory.createRecordReader(hashMap, byteArrayInputStream, r0.length, logger);
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            if (recordSetWriter == null) {
                                recordSetWriter = this.writerFactory.createWriter(logger, this.writerFactory.getSchema(hashMap, nextRecord.getSchema()), write, create);
                                recordSetWriter.beginRecordSet();
                            }
                            writeResult = recordSetWriter.write(nextRecord);
                            i += writeResult.getRecordCount();
                        }
                        eventData = eventData2;
                        byteArrayInputStream.close();
                    } catch (Throwable th) {
                        try {
                            byteArrayInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                        break;
                    }
                } catch (Exception e) {
                    logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer", e);
                    FlowFile create2 = processSession.create();
                    processSession.write(create2, outputStream -> {
                        outputStream.write(eventData2.getBody());
                    });
                    putEventHubAttributes(hashMap2, partitionContext, eventData2, lastEnqueuedEventProperties);
                    transferTo(REL_PARSE_FAILURE, processSession, stopWatch, partitionContext, processSession.putAllAttributes(create2, hashMap2));
                }
            }
            if (eventData != null) {
                putEventHubAttributes(hashMap2, partitionContext, eventData, lastEnqueuedEventProperties);
                hashMap2.put("record.count", String.valueOf(i));
                if (recordSetWriter != null) {
                    recordSetWriter.finishRecordSet();
                    hashMap2.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
                    if (writeResult != null) {
                        hashMap2.putAll(writeResult.getAttributes());
                    }
                    try {
                        recordSetWriter.close();
                    } catch (IOException e2) {
                        logger.warn("Failed to close Record Writer", e2);
                    }
                }
            }
            if (write != null) {
                write.close();
            }
            if (eventData == null) {
                processSession.remove(create);
            } else {
                transferTo(REL_SUCCESS, processSession, stopWatch, partitionContext, processSession.putAllAttributes(create, hashMap2));
            }
        } catch (Throwable th3) {
            if (write != null) {
                try {
                    write.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void transferTo(Relationship relationship, ProcessSession processSession, StopWatch stopWatch, PartitionContext partitionContext, FlowFile flowFile) {
        processSession.transfer(flowFile, relationship);
        processSession.getProvenanceReporter().receive(flowFile, getTransitUri(partitionContext), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    }

    private String createStorageConnectionString(ProcessContext processContext) {
        String value = processContext.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
        this.serviceBusEndpoint = processContext.getProperty(SERVICE_BUS_ENDPOINT).getValue();
        String replace = this.serviceBusEndpoint.replace(".servicebus.", "");
        String value2 = processContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
        return value2 != null ? String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, value, value2, replace) : String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, value, replace, processContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue());
    }

    private Map<String, EventPosition> getLegacyPartitionEventPosition(BlobContainerAsyncClient blobContainerAsyncClient, String str) {
        Map<String, EventPosition> initialPartitionEventPosition = new LegacyBlobStorageEventPositionProvider(blobContainerAsyncClient, str).getInitialPartitionEventPosition();
        for (Map.Entry<String, EventPosition> entry : initialPartitionEventPosition.entrySet()) {
            String key = entry.getKey();
            getLogger().info("Loaded Event Position [{}] for Partition [{}] from Legacy Checkpoint Storage", new Object[]{entry.getValue(), key});
        }
        return initialPartitionEventPosition;
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
        hashSet.add(REL_PARSE_FAILURE);
        RECORD_RELATIONSHIPS = Collections.unmodifiableSet(hashSet);
    }
}
