package io.debezium.storage.azure.blob.history;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Loggings;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/storage/azure/blob/history/AzureBlobSchemaHistory.class */
public class AzureBlobSchemaHistory extends AbstractSchemaHistory {
    public static final String ACCOUNT_CONNECTION_STRING_CONFIG = "azure.storage.account.connectionstring";
    public static final String CONTAINER_NAME_CONFIG = "azure.storage.account.container.name";
    public static final String BLOB_NAME_CONFIG = "azure.storage.blob.name";
    private final AtomicBoolean running = new AtomicBoolean();
    private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
    private final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private volatile BlobServiceClient blobServiceClient = null;
    private volatile BlobClient blobClient = null;
    private String container = null;
    private String blobName = null;
    private volatile List<HistoryRecord> records = new ArrayList();
    private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobSchemaHistory.class);
    public static final Field ACCOUNT_CONNECTION_STRING = Field.create("schema.history.internal.azure.storage.account.connectionstring").withDisplayName("storage connection string").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field CONTAINER_NAME = Field.create("schema.history.internal.azure.storage.account.container.name").withDisplayName("container name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field BLOB_NAME = Field.create("schema.history.internal.azure.storage.blob.name").withDisplayName("blob name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field.Set ALL_FIELDS = Field.setOf(new Field[]{ACCOUNT_CONNECTION_STRING, CONTAINER_NAME, BLOB_NAME});

    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, SchemaHistoryListener schemaHistoryListener, boolean z) {
        super.configure(configuration, historyRecordComparator, schemaHistoryListener, z);
        Field.Set set = ALL_FIELDS;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(set, logger::error)) {
            throw new SchemaHistoryException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.container = configuration.getString(CONTAINER_NAME);
        if (this.container == null) {
            throw new DebeziumException(CONTAINER_NAME + " is required to be set");
        }
        this.blobName = configuration.getString(BLOB_NAME);
        if (this.blobName == null) {
            throw new DebeziumException(BLOB_NAME + " is required to be set");
        }
    }

    public synchronized void start() {
        if (this.blobServiceClient == null) {
            this.blobServiceClient = new BlobServiceClientBuilder().connectionString(this.config.getString(ACCOUNT_CONNECTION_STRING)).buildClient();
        }
        if (this.blobClient == null) {
            this.blobClient = this.blobServiceClient.getBlobContainerClient(this.container).getBlobClient(this.blobName);
        }
        this.lock.write(() -> {
            if (this.running.compareAndSet(false, true) && !storageExists()) {
                initializeStorage();
            }
            if (!this.blobClient.exists().booleanValue()) {
                return;
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.blobClient.downloadStream(byteArrayOutputStream);
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), StandardCharsets.UTF_8));
                while (true) {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            bufferedReader.close();
                            return;
                        } else if (!readLine.isEmpty()) {
                            this.records.add(new HistoryRecord(this.reader.read(readLine)));
                        }
                    } finally {
                    }
                }
            } catch (IOException e) {
                throw new SchemaHistoryException("Unable to read object content", e);
            }
        });
        super.start();
    }

    protected void storeRecord(HistoryRecord historyRecord) throws SchemaHistoryException {
        if (this.blobClient == null) {
            throw new IllegalStateException("No Blob client is available. Ensure that 'start()' is called before storing database history records.");
        }
        if (historyRecord == null) {
            return;
        }
        LOGGER.trace("Storing record into database history: {}", historyRecord);
        this.lock.write(() -> {
            if (!this.running.get()) {
                throw new IllegalStateException("The history has been stopped and will not accept more records");
            }
            this.records.add(historyRecord);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8));
                try {
                    Iterator<HistoryRecord> it = this.records.iterator();
                    while (it.hasNext()) {
                        String write = this.documentWriter.write(it.next().document());
                        if (write != null) {
                            bufferedWriter.newLine();
                            bufferedWriter.append((CharSequence) write);
                        }
                    }
                    bufferedWriter.close();
                    this.blobClient.upload(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), true);
                } finally {
                }
            } catch (IOException e) {
                Loggings.logErrorAndTraceRecord(this.logger, historyRecord, "Failed to convert record", e);
                throw new SchemaHistoryException("Failed to convert record", e);
            }
        });
    }

    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
        this.lock.write(() -> {
            this.records.forEach(consumer);
        });
    }

    public boolean exists() {
        return !this.records.isEmpty();
    }

    public boolean storageExists() {
        boolean exists = this.blobServiceClient.getBlobContainerClient(this.container).exists();
        if (exists) {
            LOGGER.info("Container '{}' used to store database history exists", this.container);
        } else {
            LOGGER.info("Container '{}' used to store database history does not exist yet", this.container);
        }
        return exists;
    }

    public void initializeStorage() {
        this.blobServiceClient.createBlobContainer(this.container);
    }

    public String toString() {
        return "Azure Blob Storage";
    }
}
