package org.apache.samza.system.azureblob.avro;

import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.samza.AzureException;
import org.apache.samza.config.Config;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import org.apache.samza.system.azureblob.utils.BlobMetadataContext;
import org.apache.samza.system.azureblob.utils.BlobMetadataGenerator;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.class */
public class AzureBlobOutputStream extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(AzureBlobOutputStream.class);
    private static final int MAX_ATTEMPT = 3;
    private static final int MAX_BLOCKS_IN_AZURE_BLOB = 50000;
    private final long flushTimeoutMs;
    private final BlockBlobAsyncClient blobAsyncClient;
    private final Executor blobThreadPool;
    private Optional<ByteArrayOutputStream> byteArrayOutputStream;
    private final ArrayList<String> blockList;
    private final Set<CompletableFuture<Void>> pendingUpload;
    private final int maxBlockFlushThresholdSize;
    private final AzureBlobWriterMetrics metrics;
    private final Compression compression;
    private volatile boolean isClosed;
    private long totalUploadedBlockSize;
    private int blockNum;
    private final BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
    private final Config blobMetadataGeneratorConfig;
    private String streamName;

    public AzureBlobOutputStream(BlockBlobAsyncClient blockBlobAsyncClient, Executor executor, AzureBlobWriterMetrics azureBlobWriterMetrics, BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config config, String str, long j, int i, Compression compression) {
        this(blockBlobAsyncClient, executor, azureBlobWriterMetrics, blobMetadataGeneratorFactory, config, str, j, i, new ByteArrayOutputStream(i), compression);
    }

    @Override // java.io.OutputStream
    public synchronized void write(int i) {
        if (!this.byteArrayOutputStream.isPresent()) {
            throw new IllegalStateException("Internal Buffer must have been released earlier for blob " + this.blobAsyncClient.getBlobUrl().toString());
        }
        if (this.byteArrayOutputStream.get().size() + 1 > this.maxBlockFlushThresholdSize) {
            uploadBlockAsync();
        }
        this.byteArrayOutputStream.get().write(i);
        this.metrics.updateWriteByteMetrics(1L);
    }

    @Override // java.io.OutputStream
    public synchronized void write(byte[] bArr, int i, int i2) {
        if (!this.byteArrayOutputStream.isPresent()) {
            throw new IllegalStateException("Internal Buffer must have been released earlier for blob " + this.blobAsyncClient.getBlobUrl().toString());
        }
        int i3 = i2;
        int i4 = i;
        while (i3 > 0) {
            int min = Math.min(this.maxBlockFlushThresholdSize - this.byteArrayOutputStream.get().size(), i3);
            this.byteArrayOutputStream.get().write(bArr, i4, min);
            i4 += min;
            i3 -= min;
            if (this.byteArrayOutputStream.get().size() >= this.maxBlockFlushThresholdSize) {
                uploadBlockAsync();
            }
        }
        this.metrics.updateWriteByteMetrics(i2);
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public synchronized void flush() {
        if (this.byteArrayOutputStream.isPresent()) {
            uploadBlockAsync();
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.isClosed) {
            LOG.info("{}: already closed", this.blobAsyncClient.getBlobUrl().toString());
            return;
        }
        LOG.info("{}: Close", this.blobAsyncClient.getBlobUrl().toString());
        try {
            try {
                if (this.byteArrayOutputStream.isPresent()) {
                    this.byteArrayOutputStream.get().close();
                }
                if (this.blockList.size() == 0) {
                    return;
                }
                CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) this.pendingUpload.toArray(new CompletableFuture[0]));
                LOG.info("Closing blob: {} PendingUpload:{} ", this.blobAsyncClient.getBlobUrl().toString(), Integer.valueOf(this.pendingUpload.size()));
                allOf.get(this.flushTimeoutMs, TimeUnit.MILLISECONDS);
                LOG.info("For blob: {} committing blockList size:{}", this.blobAsyncClient.getBlobUrl().toString(), Integer.valueOf(this.blockList.size()));
                this.metrics.updateAzureCommitMetrics();
                commitBlob(this.blockList, getBlobMetadataGenerator().getBlobMetadata(new BlobMetadataContext(this.streamName, this.totalUploadedBlockSize)));
                clearAndMarkClosed();
            } catch (Exception e) {
                throw new AzureException(String.format("Close blob %s failed with exception. Total pending sends %d", this.blobAsyncClient.getBlobUrl().toString(), Integer.valueOf(this.pendingUpload.size())), e);
            }
        } finally {
            clearAndMarkClosed();
        }
    }

    public synchronized long getSize() {
        return this.byteArrayOutputStream.isPresent() ? this.byteArrayOutputStream.get().size() + this.totalUploadedBlockSize : this.totalUploadedBlockSize;
    }

    public synchronized void releaseBuffer() throws IOException {
        if (this.byteArrayOutputStream.isPresent()) {
            this.byteArrayOutputStream.get().close();
            this.byteArrayOutputStream = Optional.empty();
            LOG.info("Internal buffer has been released for blob " + this.blobAsyncClient.getBlobUrl().toString() + ". Writes are no longer entertained.");
        }
    }

    @VisibleForTesting
    AzureBlobOutputStream(BlockBlobAsyncClient blockBlobAsyncClient, Executor executor, AzureBlobWriterMetrics azureBlobWriterMetrics, BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config config, String str, long j, int i, ByteArrayOutputStream byteArrayOutputStream, Compression compression) {
        this.pendingUpload = ConcurrentHashMap.newKeySet();
        this.isClosed = false;
        this.totalUploadedBlockSize = 0L;
        this.byteArrayOutputStream = Optional.of(byteArrayOutputStream);
        this.blobAsyncClient = blockBlobAsyncClient;
        this.blockList = new ArrayList<>();
        this.blockNum = 0;
        this.blobThreadPool = executor;
        this.flushTimeoutMs = j;
        this.maxBlockFlushThresholdSize = i;
        this.metrics = azureBlobWriterMetrics;
        this.compression = compression;
        this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
        this.blobMetadataGeneratorConfig = config;
        this.streamName = str;
    }

    @VisibleForTesting
    void commitBlob(ArrayList<String> arrayList, Map<String, String> map) {
        this.blobAsyncClient.commitBlockListWithResponse(arrayList, (BlobHttpHeaders) null, map, (AccessTier) null, (BlobRequestConditions) null).block();
    }

    @VisibleForTesting
    void stageBlock(String str, ByteBuffer byteBuffer, int i) {
        this.blobAsyncClient.stageBlock(str, Flux.just(byteBuffer), i).block();
    }

    @VisibleForTesting
    void clearAndMarkClosed() {
        this.blockList.clear();
        this.pendingUpload.stream().forEach(completableFuture -> {
            completableFuture.cancel(true);
        });
        this.pendingUpload.clear();
        this.isClosed = true;
    }

    @VisibleForTesting
    BlobMetadataGenerator getBlobMetadataGenerator() throws Exception {
        return this.blobMetadataGeneratorFactory.getBlobMetadataGeneratorInstance(this.blobMetadataGeneratorConfig);
    }

    private synchronized void uploadBlockAsync() {
        if (this.byteArrayOutputStream.isPresent()) {
            long size = this.byteArrayOutputStream.get().size();
            if (size == 0) {
                return;
            }
            LOG.info("Blob: {} uploadBlock. Size:{}", this.blobAsyncClient.getBlobUrl().toString(), Long.valueOf(size));
            final String format = String.format("%05d", Integer.valueOf(this.blockNum));
            final String encodeToString = Base64.getEncoder().encodeToString(format.getBytes());
            this.blockList.add(encodeToString);
            final byte[] byteArray = this.byteArrayOutputStream.get().toByteArray();
            this.byteArrayOutputStream.get().reset();
            this.totalUploadedBlockSize += byteArray.length;
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(new Runnable() { // from class: org.apache.samza.system.azureblob.avro.AzureBlobOutputStream.1
                @Override // java.lang.Runnable
                public void run() {
                    int i = 0;
                    byte[] compress = AzureBlobOutputStream.this.compression.compress(byteArray);
                    int length = compress.length;
                    while (i < AzureBlobOutputStream.MAX_ATTEMPT) {
                        try {
                            ByteBuffer wrap = ByteBuffer.wrap(compress, 0, length);
                            AzureBlobOutputStream.this.metrics.updateCompressByteMetrics(length);
                            AzureBlobOutputStream.LOG.info("{} Upload block start for blob: {} for block size:{}.", new Object[]{AzureBlobOutputStream.this.blobAsyncClient.getBlobUrl().toString(), format, Integer.valueOf(length)});
                            AzureBlobOutputStream.this.metrics.updateAzureUploadMetrics();
                            AzureBlobOutputStream.this.stageBlock(encodeToString, wrap, length);
                            return;
                        } catch (Exception e) {
                            i++;
                            AzureBlobOutputStream.LOG.error("Upload block for blob: " + AzureBlobOutputStream.this.blobAsyncClient.getBlobUrl().toString() + " failed for blockid: " + format + " due to exception. AttemptCount: " + i, e);
                            if (i == AzureBlobOutputStream.MAX_ATTEMPT) {
                                throw new AzureException("Exceeded number of attempts. Max attempts is: 3", e);
                            }
                        }
                    }
                }
            }, this.blobThreadPool);
            this.pendingUpload.add(runAsync);
            runAsync.handle((r8, th) -> {
                if (th != null) {
                    throw new AzureException("Blob upload failed for blob " + this.blobAsyncClient.getBlobUrl().toString() + " and block with id: " + format, th);
                }
                LOG.info("Upload block for blob: {} with blockid: {} finished.", this.blobAsyncClient.getBlobUrl().toString(), format);
                this.pendingUpload.remove(runAsync);
                return r8;
            });
            this.blockNum++;
            if (this.blockNum >= MAX_BLOCKS_IN_AZURE_BLOB) {
                throw new AzureException("Azure blob only supports 50000 blocks in a blob. Current number of blocks is " + this.blockNum);
            }
        }
    }
}
