package org.apache.samza.system.azureblob.avro;

import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter.class */
public class AzureBlobAvroWriter implements AzureBlobWriter {
    private static final String PUBLISHED_FILE_NAME_DATE_FORMAT = "yyyy/MM/dd/HH/mm-ss";
    private static final String BLOB_NAME_AVRO = "%s/%s.avro%s";
    private static final String BLOB_NAME_RANDOM_STRING_AVRO = "%s/%s-%s.avro%s";
    static final long DATAFILEWRITER_OVERHEAD = 1000000;
    private BlobWriterComponents currentBlobWriterComponents;
    private final List<BlobWriterComponents> allBlobWriterComponents;
    private Schema schema;
    private DatumWriter<IndexedRecord> datumWriter;
    private volatile boolean isClosed;
    private final Executor blobThreadPool;
    private final AzureBlobWriterMetrics metrics;
    private final int maxBlockFlushThresholdSize;
    private final long flushTimeoutMs;
    private final Compression compression;
    private final BlobContainerAsyncClient containerAsyncClient;
    private final String blobURLPrefix;
    private final long maxBlobSize;
    private final long maxRecordsPerBlob;
    private final boolean useRandomStringInBlobName;
    private final Object currentDataFileWriterLock;
    private volatile long recordsInCurrentBlob;
    private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
    private Config blobMetadataGeneratorConfig;
    private String streamName;
    private static final Logger LOG = LoggerFactory.getLogger(AzureBlobAvroWriter.class);
    private static final SimpleDateFormat UTC_FORMATTER = buildUTCFormatter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/system/azureblob/avro/AzureBlobAvroWriter$BlobWriterComponents.class */
    public class BlobWriterComponents {
        final DataFileWriter<IndexedRecord> dataFileWriter;
        final AzureBlobOutputStream azureBlobOutputStream;
        final BlockBlobAsyncClient blockBlobAsyncClient;

        public BlobWriterComponents(DataFileWriter dataFileWriter, AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient) {
            Preconditions.checkNotNull(dataFileWriter, "DataFileWriter can not be null when creating WriterComponents for an Azure Blob.");
            Preconditions.checkNotNull(azureBlobOutputStream, "AzureBlobOutputStream can not be null when creating WriterComponents for an Azure Blob.");
            Preconditions.checkNotNull(blockBlobAsyncClient, "BlockBlobAsyncClient can not be null when creating WriterComponents for an Azure Blob.");
            this.dataFileWriter = dataFileWriter;
            this.azureBlobOutputStream = azureBlobOutputStream;
            this.blockBlobAsyncClient = blockBlobAsyncClient;
        }
    }

    public AzureBlobAvroWriter(BlobContainerAsyncClient blobContainerAsyncClient, String str, Executor executor, AzureBlobWriterMetrics azureBlobWriterMetrics, BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config config, String str2, int i, long j, Compression compression, boolean z, long j2, long j3) {
        this.currentBlobWriterComponents = null;
        this.allBlobWriterComponents = new ArrayList();
        this.schema = null;
        this.datumWriter = null;
        this.isClosed = false;
        this.currentDataFileWriterLock = new Object();
        this.recordsInCurrentBlob = 0L;
        this.blobThreadPool = executor;
        this.metrics = azureBlobWriterMetrics;
        this.maxBlockFlushThresholdSize = i;
        this.flushTimeoutMs = j;
        this.compression = compression;
        this.containerAsyncClient = blobContainerAsyncClient;
        this.blobURLPrefix = str;
        this.useRandomStringInBlobName = z;
        this.maxBlobSize = j2;
        this.maxRecordsPerBlob = j3;
        this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
        this.blobMetadataGeneratorConfig = config;
        this.streamName = str2;
    }

    @Override // org.apache.samza.system.azureblob.producer.AzureBlobWriter
    public void write(OutgoingMessageEnvelope outgoingMessageEnvelope) throws IOException {
        Optional<IndexedRecord> empty;
        byte[] bArr;
        if (outgoingMessageEnvelope.getMessage() instanceof IndexedRecord) {
            empty = Optional.of((IndexedRecord) outgoingMessageEnvelope.getMessage());
            bArr = encodeRecord((IndexedRecord) outgoingMessageEnvelope.getMessage());
        } else {
            if (!(outgoingMessageEnvelope.getMessage() instanceof byte[])) {
                throw new IllegalArgumentException("AzureBlobAvroWriter only supports IndexedRecord and byte[].");
            }
            empty = Optional.empty();
            bArr = (byte[]) outgoingMessageEnvelope.getMessage();
        }
        synchronized (this.currentDataFileWriterLock) {
            if (this.currentBlobWriterComponents == null || willCurrentBlobExceedSize(bArr) || willCurrentBlobExceedRecordLimit()) {
                startNextBlob(empty);
            }
            this.currentBlobWriterComponents.dataFileWriter.appendEncoded(ByteBuffer.wrap(bArr));
            this.recordsInCurrentBlob++;
            this.currentBlobWriterComponents.azureBlobOutputStream.incrementNumberOfRecordsInBlob();
        }
    }

    @Override // org.apache.samza.system.azureblob.producer.AzureBlobWriter
    public void flush() throws IOException {
        synchronized (this.currentDataFileWriterLock) {
            this.currentBlobWriterComponents.dataFileWriter.flush();
        }
    }

    @Override // org.apache.samza.system.azureblob.producer.AzureBlobWriter
    public void close() {
        synchronized (this.currentDataFileWriterLock) {
            if (this.isClosed) {
                throw new IllegalStateException("Attempting to close an already closed AzureBlobAvroWriter");
            }
            this.allBlobWriterComponents.forEach(blobWriterComponents -> {
                try {
                    closeDataFileWriter(blobWriterComponents.dataFileWriter, blobWriterComponents.azureBlobOutputStream, blobWriterComponents.blockBlobAsyncClient);
                } catch (IOException e) {
                    throw new SamzaException(e);
                }
            });
            this.isClosed = true;
        }
    }

    @VisibleForTesting
    AzureBlobAvroWriter(BlobContainerAsyncClient blobContainerAsyncClient, AzureBlobWriterMetrics azureBlobWriterMetrics, Executor executor, int i, int i2, String str, DataFileWriter<IndexedRecord> dataFileWriter, AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient, BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config config, String str2, long j, long j2, Compression compression, boolean z) {
        this.currentBlobWriterComponents = null;
        this.allBlobWriterComponents = new ArrayList();
        this.schema = null;
        this.datumWriter = null;
        this.isClosed = false;
        this.currentDataFileWriterLock = new Object();
        this.recordsInCurrentBlob = 0L;
        if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
            this.currentBlobWriterComponents = null;
        } else {
            this.currentBlobWriterComponents = new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
        }
        this.allBlobWriterComponents.add(this.currentBlobWriterComponents);
        this.blobThreadPool = executor;
        this.blobURLPrefix = str;
        this.metrics = azureBlobWriterMetrics;
        this.maxBlockFlushThresholdSize = i;
        this.flushTimeoutMs = i2;
        this.compression = compression;
        this.containerAsyncClient = blobContainerAsyncClient;
        this.useRandomStringInBlobName = z;
        this.maxBlobSize = j;
        this.maxRecordsPerBlob = j2;
        this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
        this.blobMetadataGeneratorConfig = config;
        this.streamName = str2;
    }

    @VisibleForTesting
    byte[] encodeRecord(IndexedRecord indexedRecord) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Schema schema = indexedRecord.getSchema();
        try {
            BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
            (indexedRecord instanceof SpecificRecord ? new SpecificDatumWriter(schema) : new GenericDatumWriter(schema)).write(indexedRecord, binaryEncoder);
            binaryEncoder.flush();
            return byteArrayOutputStream.toByteArray();
        } catch (Exception e) {
            throw new SamzaException("Unable to serialize Avro record using schema within the record: " + schema.toString(), e);
        }
    }

    private static SimpleDateFormat buildUTCFormatter() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(PUBLISHED_FILE_NAME_DATE_FORMAT);
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
        return simpleDateFormat;
    }

    private void closeDataFileWriter(DataFileWriter dataFileWriter, AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient) throws IOException {
        try {
            LOG.info("Closing the blob: {}", blockBlobAsyncClient.getBlobUrl().toString());
            dataFileWriter.close();
        } catch (Exception e) {
            LOG.error("Exception occurred during DataFileWriter.close for blob  " + blockBlobAsyncClient.getBlobUrl() + ". All blocks uploaded so far for this blob will be discarded to avoid invalid blobs.");
            throw e;
        }
    }

    private void startNextBlob(Optional<IndexedRecord> optional) throws IOException {
        if (this.currentBlobWriterComponents != null) {
            LOG.info("Starting new blob as current blob size is " + this.currentBlobWriterComponents.azureBlobOutputStream.getSize() + " and max blob size is " + this.maxBlobSize + " or number of records is " + this.recordsInCurrentBlob + " and max records in blob is " + this.maxRecordsPerBlob);
            this.currentBlobWriterComponents.dataFileWriter.flush();
            this.currentBlobWriterComponents.azureBlobOutputStream.releaseBuffer();
            this.recordsInCurrentBlob = 0L;
        }
        if (this.datumWriter == null) {
            if (!optional.isPresent()) {
                throw new IllegalStateException("Writing without schema setup.");
            }
            IndexedRecord indexedRecord = optional.get();
            this.schema = indexedRecord.getSchema();
            if (indexedRecord instanceof SpecificRecord) {
                this.datumWriter = new SpecificDatumWriter(this.schema);
            } else {
                this.datumWriter = new GenericDatumWriter(this.schema);
            }
        }
        String format = this.useRandomStringInBlobName ? String.format(BLOB_NAME_RANDOM_STRING_AVRO, this.blobURLPrefix, UTC_FORMATTER.format(Long.valueOf(System.currentTimeMillis())), UUID.randomUUID().toString().substring(0, 8), this.compression.getFileExtension()) : String.format(BLOB_NAME_AVRO, this.blobURLPrefix, UTC_FORMATTER.format(Long.valueOf(System.currentTimeMillis())), this.compression.getFileExtension());
        LOG.info("Creating new blob: {}", format);
        BlockBlobAsyncClient blockBlobAsyncClient = this.containerAsyncClient.getBlobAsyncClient(format).getBlockBlobAsyncClient();
        DataFileWriter dataFileWriter = new DataFileWriter(this.datumWriter);
        try {
            AzureBlobOutputStream azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, this.blobThreadPool, this.metrics, this.blobMetadataGeneratorFactory, this.blobMetadataGeneratorConfig, this.streamName, this.flushTimeoutMs, this.maxBlockFlushThresholdSize, this.compression);
            dataFileWriter.create(this.schema, azureBlobOutputStream);
            dataFileWriter.setFlushOnEveryBlock(false);
            this.currentBlobWriterComponents = new BlobWriterComponents(dataFileWriter, azureBlobOutputStream, blockBlobAsyncClient);
            this.allBlobWriterComponents.add(this.currentBlobWriterComponents);
            LOG.info("Created new blob: {}", format);
        } catch (Exception e) {
            throw new SamzaException("Unable to create AzureBlobOutputStream", e);
        }
    }

    private boolean willCurrentBlobExceedSize(byte[] bArr) {
        return (this.currentBlobWriterComponents.azureBlobOutputStream.getSize() + ((long) bArr.length)) + DATAFILEWRITER_OVERHEAD > this.maxBlobSize;
    }

    private boolean willCurrentBlobExceedRecordLimit() {
        return this.recordsInCurrentBlob + 1 > this.maxRecordsPerBlob;
    }
}
