package org.apache.samza.system.azureblob.producer;

import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.SkuName;
import com.azure.storage.blob.models.StorageAccountInfo;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemProducerException;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.CompressionFactory;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/azureblob/producer/AzureBlobSystemProducer.class */
public class AzureBlobSystemProducer implements SystemProducer {
    private static final Logger LOG = LoggerFactory.getLogger(AzureBlobSystemProducer.class.getName());
    private static final String BLOB_NAME_PREFIX = "%s";
    private static final String BLOB_NAME_PARTITION_PREFIX = "%s/%s";
    private static final String AZURE_URL = "https://%s.blob.core.windows.net";
    private static final int PREMIUM_MAX_BLOCK_SIZE = 104857600;
    private static final int STANDARD_MAX_BLOCK_SIZE = 4194304;
    private BlobContainerAsyncClient containerAsyncClient;
    private final String systemName;
    private final AzureBlobConfig config;
    private final Map<String, Map<String, AzureBlobWriter>> writerMap;
    private final AzureBlobWriterFactory writerFactory;
    private final int blockFlushThresholdSize;
    private final long flushTimeoutMs;
    private final long closeTimeout;
    private final ThreadPoolExecutor asyncBlobThreadPool;
    private final AzureBlobSystemProducerMetrics metrics;
    private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
    private final Config blobMetadataGeneratorConfig;
    private volatile boolean isStarted = false;
    private volatile boolean isStopped = false;
    private final Map<String, Object> sourceWriterCreationLockMap = new ConcurrentHashMap();
    private final Map<String, ReadWriteLock> sourceSendFlushLockMap = new ConcurrentHashMap();

    public AzureBlobSystemProducer(String str, AzureBlobConfig azureBlobConfig, MetricsRegistry metricsRegistry) {
        Preconditions.checkNotNull(str, "System name can not be null when creating AzureBlobSystemProducer");
        Preconditions.checkNotNull(azureBlobConfig, "Config can not be null when creating AzureBlobSystemProducer");
        Preconditions.checkNotNull(metricsRegistry, "Metrics registry can not be null when creating AzureBlobSystemProducer");
        System.setProperty("AZURE_LOG_LEVEL", "1");
        this.systemName = str;
        this.config = azureBlobConfig;
        String azureBlobWriterFactoryClassName = this.config.getAzureBlobWriterFactoryClassName(this.systemName);
        try {
            this.writerFactory = (AzureBlobWriterFactory) Class.forName(azureBlobWriterFactoryClassName).newInstance();
            this.flushTimeoutMs = this.config.getFlushTimeoutMs(this.systemName);
            this.closeTimeout = this.config.getCloseTimeoutMs(this.systemName);
            this.blockFlushThresholdSize = this.config.getMaxFlushThresholdSize(this.systemName);
            int azureBlobThreadPoolCount = this.config.getAzureBlobThreadPoolCount(this.systemName);
            int blockingQueueSize = this.config.getBlockingQueueSize(this.systemName);
            LOG.info("SystemName: {} block flush size:{}", str, Integer.valueOf(this.blockFlushThresholdSize));
            LOG.info("SystemName: {} thread count:{}", str, Integer.valueOf(azureBlobThreadPoolCount));
            this.asyncBlobThreadPool = new ThreadPoolExecutor(azureBlobThreadPoolCount, azureBlobThreadPoolCount, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque(blockingQueueSize), new ThreadPoolExecutor.CallerRunsPolicy());
            this.writerMap = new ConcurrentHashMap();
            this.metrics = new AzureBlobSystemProducerMetrics(str, azureBlobConfig.getAzureAccountName(str), metricsRegistry);
            String systemBlobMetadataPropertiesGeneratorFactory = this.config.getSystemBlobMetadataPropertiesGeneratorFactory(this.systemName);
            try {
                this.blobMetadataGeneratorFactory = (BlobMetadataGeneratorFactory) Class.forName(systemBlobMetadataPropertiesGeneratorFactory).newInstance();
                this.blobMetadataGeneratorConfig = this.config.getSystemBlobMetadataGeneratorConfigs(str);
            } catch (Exception e) {
                throw new SystemProducerException("Could not create blob metadata generator factory with name " + systemBlobMetadataPropertiesGeneratorFactory, e);
            }
        } catch (Exception e2) {
            throw new SystemProducerException("Could not create writer factory with name " + azureBlobWriterFactoryClassName, e2);
        }
    }

    public synchronized void start() {
        if (this.isStarted) {
            LOG.warn("Attempting to start an already started producer.");
            return;
        }
        setupAzureContainer(this.config.getAzureAccountName(this.systemName), this.config.getAzureAccountKey(this.systemName));
        LOG.info("Starting producer.");
        this.isStarted = true;
    }

    public synchronized void stop() {
        if (!this.isStarted) {
            LOG.warn("Attempting to stop a producer that was not started.");
            return;
        }
        if (this.isStopped) {
            LOG.warn("Attempting to stop an already stopped producer.");
            return;
        }
        try {
            try {
                this.writerMap.forEach((str, map) -> {
                    flush(str);
                });
                this.asyncBlobThreadPool.shutdown();
                this.isStarted = false;
                this.writerMap.clear();
                this.isStopped = true;
            } catch (Exception e) {
                throw new SystemProducerException("Stop failed with exception.", e);
            }
        } catch (Throwable th) {
            this.writerMap.clear();
            this.isStopped = true;
            throw th;
        }
    }

    public synchronized void register(String str) {
        LOG.info("Registering source {}", str);
        if (this.isStarted) {
            throw new SystemProducerException("Cannot register once the producer is started.");
        }
        if (this.writerMap.containsKey(str)) {
            LOG.warn("Source: {} already registered", str);
            return;
        }
        this.writerMap.put(str, new ConcurrentHashMap());
        this.sourceWriterCreationLockMap.put(str, new Object());
        this.sourceSendFlushLockMap.put(str, new ReentrantReadWriteLock());
        this.metrics.register(str);
    }

    public void send(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        if (!this.isStarted) {
            throw new SystemProducerException("Trying to send before producer has started.");
        }
        if (this.isStopped) {
            throw new SystemProducerException("Sending after producer has been stopped.");
        }
        ReadWriteLock readWriteLock = this.sourceSendFlushLockMap.get(str);
        if (readWriteLock == null) {
            throw new SystemProducerException("Attempting to send to source: " + str + " but it was not registered");
        }
        readWriteLock.readLock().lock();
        try {
            try {
                getOrCreateWriter(str, outgoingMessageEnvelope).write(outgoingMessageEnvelope);
                this.metrics.updateWriteMetrics(str);
                readWriteLock.readLock().unlock();
            } catch (Exception e) {
                this.metrics.updateErrorMetrics(str);
                String partitionKey = getPartitionKey(outgoingMessageEnvelope);
                throw new SystemProducerException("Send failed for source: " + str + ", system: " + this.systemName + ", stream: " + outgoingMessageEnvelope.getSystemStream().getStream() + ", partitionKey: " + ((Object) (partitionKey != null ? partitionKey : "null")), e);
            }
        } catch (Throwable th) {
            readWriteLock.readLock().unlock();
            throw th;
        }
    }

    public void flush(String str) {
        if (!this.isStarted) {
            throw new SystemProducerException("Trying to flush before producer has started.");
        }
        if (this.isStopped) {
            throw new SystemProducerException("Flushing after producer has been stopped.");
        }
        ReadWriteLock readWriteLock = this.sourceSendFlushLockMap.get(str);
        if (readWriteLock == null) {
            throw new SystemProducerException("Attempting to flush source: " + str + " but it was not registered");
        }
        readWriteLock.writeLock().lock();
        Map<String, AzureBlobWriter> map = this.writerMap.get(str);
        try {
            try {
                flushWriters(map);
                closeWriters(str, map);
                map.clear();
                readWriteLock.writeLock().unlock();
            } catch (Exception e) {
                this.metrics.updateErrorMetrics(str);
                throw new SystemProducerException("Flush failed for system:" + this.systemName + " and source: " + str, e);
            }
        } catch (Throwable th) {
            map.clear();
            readWriteLock.writeLock().unlock();
            throw th;
        }
    }

    @VisibleForTesting
    void setupAzureContainer(String str, String str2) {
        HttpClient createDefault;
        try {
            StorageSharedKeyCredential storageSharedKeyCredential = new StorageSharedKeyCredential(str, str2);
            if (this.config.getUseProxy(this.systemName)) {
                LOG.info("HTTP Proxy setup for AzureBlob pipeline");
                createDefault = new NettyAsyncHttpClientBuilder().proxy(new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress(this.config.getAzureProxyHostname(this.systemName), this.config.getAzureProxyPort(this.systemName)))).build();
            } else {
                createDefault = HttpClient.createDefault();
            }
            String format = String.format(Locale.ROOT, AZURE_URL, str);
            HttpLogOptions httpLogOptions = new HttpLogOptions();
            httpLogOptions.setLogLevel(HttpLogDetailLevel.BASIC);
            BlobServiceAsyncClient buildAsyncClient = new BlobServiceClientBuilder().httpLogOptions(httpLogOptions).endpoint(format).credential(storageSharedKeyCredential).httpClient(createDefault).buildAsyncClient();
            SkuName skuName = ((StorageAccountInfo) buildAsyncClient.getAccountInfo().block()).getSkuName();
            long maxFlushThresholdSize = this.config.getMaxFlushThresholdSize(this.systemName);
            boolean z = SkuName.PREMIUM_LRS == skuName;
            if (z && maxFlushThresholdSize > 104857600) {
                throw new SystemProducerException("Azure storage account with name: " + str + " is a premium account and can only handle upto " + PREMIUM_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is " + maxFlushThresholdSize);
            }
            if (!z && maxFlushThresholdSize > 4194304) {
                throw new SystemProducerException("Azure storage account with name: " + str + " is a standard account and can only handle upto " + STANDARD_MAX_BLOCK_SIZE + " threshold size. Given flush threshold size is " + maxFlushThresholdSize);
            }
            this.containerAsyncClient = buildAsyncClient.getBlobContainerAsyncClient(this.systemName);
            createContainerIfNotExists(this.containerAsyncClient);
        } catch (Exception e) {
            this.metrics.updateAzureContainerMetrics();
            throw new SystemProducerException("Failed to set up Azure container for SystemName: " + this.systemName, e);
        }
    }

    @VisibleForTesting
    AzureBlobWriter getOrCreateWriter(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        String str2;
        String format;
        String partitionKey = getPartitionKey(outgoingMessageEnvelope);
        if (partitionKey == null) {
            str2 = outgoingMessageEnvelope.getSystemStream().getStream();
            format = String.format(BLOB_NAME_PREFIX, outgoingMessageEnvelope.getSystemStream().getStream());
        } else {
            str2 = outgoingMessageEnvelope.getSystemStream().getStream() + "/" + partitionKey;
            format = String.format(BLOB_NAME_PARTITION_PREFIX, outgoingMessageEnvelope.getSystemStream().getStream(), partitionKey);
        }
        Map<String, AzureBlobWriter> map = this.writerMap.get(str);
        if (map == null) {
            throw new SystemProducerException("Attempting to send to source: " + str + " but it is not registered");
        }
        AzureBlobWriter azureBlobWriter = map.get(str2);
        if (azureBlobWriter == null) {
            synchronized (this.sourceWriterCreationLockMap.get(str)) {
                azureBlobWriter = map.get(str2);
                if (azureBlobWriter == null) {
                    azureBlobWriter = createNewWriter(format, new AzureBlobWriterMetrics(this.metrics.getAggregateMetrics(), this.metrics.getSystemMetrics(), this.metrics.getSourceMetrics(str)), outgoingMessageEnvelope.getSystemStream().getStream());
                    map.put(str2, azureBlobWriter);
                }
            }
        }
        return azureBlobWriter;
    }

    private void createContainerIfNotExists(BlobContainerAsyncClient blobContainerAsyncClient) {
        try {
            blobContainerAsyncClient.create().block();
        } catch (BlobStorageException e) {
            if (e.getErrorCode() == BlobErrorCode.RESOURCE_NOT_FOUND) {
                LOG.error("Error creating the container url " + blobContainerAsyncClient.getBlobContainerUrl().toString() + " with status code: " + e.getResponse().getStatusCode(), e);
            } else if (e.getErrorCode() == BlobErrorCode.CONTAINER_BEING_DELETED) {
                LOG.error("Container is being deleted. Container URL is: " + blobContainerAsyncClient.getBlobContainerUrl().toString(), e);
            } else if (e.getErrorCode() == BlobErrorCode.CONTAINER_ALREADY_EXISTS) {
                return;
            }
            throw e;
        }
    }

    private String getPartitionKey(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        Object partitionKey = outgoingMessageEnvelope.getPartitionKey();
        if (partitionKey == null || !(partitionKey instanceof String)) {
            return null;
        }
        return (String) partitionKey;
    }

    private void flushWriters(Map<String, AzureBlobWriter> map) {
        map.forEach((str, azureBlobWriter) -> {
            try {
                LOG.info("Flushing topic:{}", str);
                azureBlobWriter.flush();
            } catch (IOException e) {
                throw new SystemProducerException("Close failed for topic " + str, e);
            }
        });
    }

    private void closeWriters(String str, Map<String, AzureBlobWriter> map) throws Exception {
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        try {
            map.forEach((str2, azureBlobWriter) -> {
                LOG.info("Closing topic:{}", str2);
                CompletableFuture<Void> runAsync = CompletableFuture.runAsync(new Runnable() { // from class: org.apache.samza.system.azureblob.producer.AzureBlobSystemProducer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            azureBlobWriter.close();
                        } catch (IOException e) {
                            throw new SystemProducerException("Close failed for topic " + str2, e);
                        }
                    }
                }, this.asyncBlobThreadPool);
                newKeySet.add(runAsync);
                runAsync.handle((r8, th) -> {
                    map.remove(azureBlobWriter);
                    if (th != null) {
                        throw new SystemProducerException("Close failed for topic " + str2, th);
                    }
                    LOG.info("Blob close finished for stream " + str2);
                    return r8;
                });
            });
            CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) newKeySet.toArray(new CompletableFuture[0]));
            LOG.info("Flush source: {} has pending closes: {} ", str, Integer.valueOf(newKeySet.size()));
            allOf.get(this.closeTimeout, TimeUnit.MILLISECONDS);
            newKeySet.clear();
        } catch (Throwable th) {
            newKeySet.clear();
            throw th;
        }
    }

    @VisibleForTesting
    AzureBlobWriter createNewWriter(String str, AzureBlobWriterMetrics azureBlobWriterMetrics, String str2) {
        try {
            return this.writerFactory.getWriterInstance(this.containerAsyncClient, str, this.asyncBlobThreadPool, azureBlobWriterMetrics, this.blobMetadataGeneratorFactory, this.blobMetadataGeneratorConfig, str2, this.blockFlushThresholdSize, this.flushTimeoutMs, CompressionFactory.getInstance().getCompression(this.config.getCompressionType(this.systemName)), this.config.getSuffixRandomStringToBlobName(this.systemName), this.config.getMaxBlobSize(this.systemName), this.config.getMaxMessagesPerBlob(this.systemName));
        } catch (Exception e) {
            throw new RuntimeException("Failed to create a writer for the producer.", e);
        }
    }
}
