/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.azure.blob.history;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Loggings;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureBlobSchemaHistory
extends AbstractSchemaHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobSchemaHistory.class);
    public static final String ACCOUNT_CONNECTION_STRING_CONFIG = "azure.storage.account.connectionstring";
    public static final String CONTAINER_NAME_CONFIG = "azure.storage.account.container.name";
    public static final String BLOB_NAME_CONFIG = "azure.storage.blob.name";
    public static final Field ACCOUNT_CONNECTION_STRING = Field.create((String)"schema.history.internal.azure.storage.account.connectionstring").withDisplayName("storage connection string").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field CONTAINER_NAME = Field.create((String)"schema.history.internal.azure.storage.account.container.name").withDisplayName("container name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field BLOB_NAME = Field.create((String)"schema.history.internal.azure.storage.blob.name").withDisplayName("blob name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field.Set ALL_FIELDS = Field.setOf((Field[])new Field[]{ACCOUNT_CONNECTION_STRING, CONTAINER_NAME, BLOB_NAME});
    private final AtomicBoolean running = new AtomicBoolean();
    private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
    private final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private volatile BlobServiceClient blobServiceClient = null;
    private volatile BlobClient blobClient = null;
    private String container = null;
    private String blobName = null;
    private volatile List<HistoryRecord> records = new ArrayList<HistoryRecord>();

    public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        if (!config.validateAndRecord((Iterable)ALL_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            throw new SchemaHistoryException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
        }
        this.container = config.getString(CONTAINER_NAME);
        if (this.container == null) {
            throw new DebeziumException(CONTAINER_NAME + " is required to be set");
        }
        this.blobName = config.getString(BLOB_NAME);
        if (this.blobName == null) {
            throw new DebeziumException(BLOB_NAME + " is required to be set");
        }
    }

    public synchronized void start() {
        if (this.blobServiceClient == null) {
            this.blobServiceClient = new BlobServiceClientBuilder().connectionString(this.config.getString(ACCOUNT_CONNECTION_STRING)).buildClient();
        }
        if (this.blobClient == null) {
            this.blobClient = this.blobServiceClient.getBlobContainerClient(this.container).getBlobClient(this.blobName);
        }
        this.lock.write(() -> {
            if (this.running.compareAndSet(false, true) && !this.storageExists()) {
                this.initializeStorage();
            }
            if (this.blobClient.exists().booleanValue()) {
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                this.blobClient.downloadStream((OutputStream)outputStream);
                ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
                try (BufferedReader historyReader = new BufferedReader(new InputStreamReader((InputStream)inputStream, StandardCharsets.UTF_8));){
                    String line;
                    while ((line = historyReader.readLine()) != null) {
                        if (line.isEmpty()) continue;
                        this.records.add(new HistoryRecord(this.reader.read(line)));
                    }
                }
                catch (IOException e) {
                    throw new SchemaHistoryException("Unable to read object content", (Throwable)e);
                }
            }
        });
        super.start();
    }

    protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
        if (this.blobClient == null) {
            throw new IllegalStateException("No Blob client is available. Ensure that 'start()' is called before storing database history records.");
        }
        if (record == null) {
            return;
        }
        LOGGER.trace("Storing record into database history: {}", (Object)record);
        this.lock.write(() -> {
            if (!this.running.get()) {
                throw new IllegalStateException("The history has been stopped and will not accept more records");
            }
            this.records.add(record);
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            try (BufferedWriter historyWriter = new BufferedWriter(new OutputStreamWriter((OutputStream)outputStream, StandardCharsets.UTF_8));){
                for (HistoryRecord r : this.records) {
                    String line = null;
                    line = this.documentWriter.write(r.document());
                    if (line == null) continue;
                    historyWriter.newLine();
                    historyWriter.append(line);
                }
            }
            catch (IOException e) {
                Loggings.logErrorAndTraceRecord((Logger)this.logger, (Object)record, (String)"Failed to convert record", (Throwable)e);
                throw new SchemaHistoryException("Failed to convert record", (Throwable)e);
            }
            ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
            this.blobClient.upload((InputStream)inputStream, true);
        });
    }

    protected void recoverRecords(Consumer<HistoryRecord> records) {
        this.lock.write(() -> this.records.forEach(records));
    }

    public boolean exists() {
        return !this.records.isEmpty();
    }

    public boolean storageExists() {
        boolean containerExists = this.blobServiceClient.getBlobContainerClient(this.container).exists();
        if (containerExists) {
            LOGGER.info("Container '{}' used to store database history exists", (Object)this.container);
        } else {
            LOGGER.info("Container '{}' used to store database history does not exist yet", (Object)this.container);
        }
        return containerExists;
    }

    public void initializeStorage() {
        this.blobServiceClient.createBlobContainer(this.container);
    }

    public String toString() {
        return "Azure Blob Storage";
    }
}

