package io.debezium.storage.s3.history;

import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Loggings;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

@NotThreadSafe
/* loaded from: input_file:io/debezium/storage/s3/history/S3SchemaHistory.class */
public class S3SchemaHistory extends AbstractSchemaHistory {
    public static final String ACCESS_KEY_ID_CONFIG = "s3.access.key.id";
    public static final String SECRET_ACCESS_KEY_CONFIG = "s3.secret.access.key";
    public static final String OBJECT_CONTENT_TYPE = "text/plain";
    private final AtomicBoolean running = new AtomicBoolean();
    private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
    private final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String bucket = null;
    private String objectName = null;
    private Region region = null;
    private URI endpoint = null;
    private AwsCredentialsProvider credentialsProvider = null;
    private volatile S3Client client = null;
    private volatile List<HistoryRecord> records = new ArrayList();
    private static final Logger LOGGER = LoggerFactory.getLogger(S3SchemaHistory.class);
    public static final Field ACCESS_KEY_ID = Field.create("schema.history.internal.s3.access.key.id").withDisplayName("S3 access key id").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field SECRET_ACCESS_KEY = Field.create("schema.history.internal.s3.secret.access.key").withDisplayName("S3 secret access key").withType(ConfigDef.Type.PASSWORD).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final String REGION_CONFIG = "schema.history.internal.s3.region.name";
    public static final Field REGION = Field.create(REGION_CONFIG).withDisplayName("S3 region").withWidth(ConfigDef.Width.LONG).withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.MEDIUM);
    public static final String BUCKET_CONFIG = "schema.history.internal.s3.bucket.name";
    public static final Field BUCKET = Field.create(BUCKET_CONFIG).withDisplayName("S3 bucket").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.HIGH);
    public static final String OBJECT_NAME_CONFIG = "schema.history.internal.s3.object.name";
    public static final Field OBJECT_NAME = Field.create(OBJECT_NAME_CONFIG).withDisplayName("S3 Object name").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the object under which the history is stored.");
    public static final String ENDPOINT_CONFIG = "schema.history.internal.s3.endpoint";
    public static final Field ENDPOINT = Field.create(ENDPOINT_CONFIG).withDisplayName("S3 endpoint").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.LOW);
    public static final Field.Set ALL_FIELDS = Field.setOf(new Field[]{ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION, BUCKET, ENDPOINT});

    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, SchemaHistoryListener schemaHistoryListener, boolean z) {
        super.configure(configuration, historyRecordComparator, schemaHistoryListener, z);
        Field.Set set = ALL_FIELDS;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(set, logger::error)) {
            throw new SchemaHistoryException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.bucket = configuration.getString(BUCKET);
        if (this.bucket == null) {
            throw new DebeziumException(BUCKET + " is required to be set");
        }
        this.objectName = configuration.getString(OBJECT_NAME);
        if (this.objectName == null) {
            throw new DebeziumException(OBJECT_NAME + " is required to be set");
        }
        String string = configuration.getString(REGION);
        if (string == null) {
            throw new DebeziumException(REGION + " is required to be set");
        }
        this.region = Region.of(string);
        LOGGER.info("Database history will be stored in bucket '{}' under key '{}' using region '{}'", new Object[]{this.bucket, this.objectName, this.region});
        String string2 = configuration.getString(ENDPOINT);
        if (string2 != null) {
            LOGGER.info("Using explicitly configured endpoint " + string2);
            this.endpoint = URI.create(string2);
        }
        if (configuration.getString(ACCESS_KEY_ID) == null && configuration.getString(SECRET_ACCESS_KEY) == null) {
            LOGGER.info("DefaultCreadentialsProvider is used for authentication");
            this.credentialsProvider = DefaultCredentialsProvider.create();
        } else {
            LOGGER.info("StaticCredentialsProvider is used for authentication");
            this.credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(configuration.getString(ACCESS_KEY_ID), configuration.getString(SECRET_ACCESS_KEY)));
        }
    }

    public synchronized void start() {
        if (this.client == null) {
            S3ClientBuilder region = S3Client.builder().credentialsProvider(this.credentialsProvider).region(this.region);
            if (this.endpoint != null) {
                region.endpointOverride(this.endpoint);
            }
            this.client = (S3Client) region.build();
        }
        this.lock.write(() -> {
            if (!this.running.compareAndSet(false, true)) {
                return;
            }
            if (!storageExists()) {
                initializeStorage();
            }
            InputStream inputStream = null;
            try {
                inputStream = (InputStream) this.client.getObject((GetObjectRequest) GetObjectRequest.builder().bucket(this.bucket).key(this.objectName).responseContentType(OBJECT_CONTENT_TYPE).build(), ResponseTransformer.toInputStream());
            } catch (NoSuchKeyException e) {
            } catch (S3Exception e2) {
                throw new SchemaHistoryException("Can't retrieve history object from S3", e2);
            }
            if (inputStream == null) {
                return;
            }
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                while (true) {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            bufferedReader.close();
                            return;
                        } else if (!readLine.isEmpty()) {
                            this.records.add(new HistoryRecord(this.reader.read(readLine)));
                        }
                    } finally {
                    }
                }
            } catch (IOException e3) {
                throw new SchemaHistoryException("Unable to read object content", e3);
            }
        });
        super.start();
    }

    public synchronized void stop() {
        if (this.running.compareAndSet(true, false) && this.client != null) {
            this.client.close();
        }
        super.stop();
    }

    protected void storeRecord(HistoryRecord historyRecord) throws SchemaHistoryException {
        if (this.client == null) {
            throw new IllegalStateException("No S3 client is available. Ensure that 'start()' is called before storing database history records.");
        }
        if (historyRecord == null) {
            return;
        }
        LOGGER.trace("Storing record into database history: {}", historyRecord);
        this.lock.write(() -> {
            if (!this.running.get()) {
                throw new IllegalStateException("The history has been stopped and will not accept more records");
            }
            this.records.add(historyRecord);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8));
                try {
                    Iterator<HistoryRecord> it = this.records.iterator();
                    while (it.hasNext()) {
                        String write = this.documentWriter.write(it.next().document());
                        if (write != null) {
                            bufferedWriter.newLine();
                            bufferedWriter.append((CharSequence) write);
                        }
                    }
                    bufferedWriter.close();
                    try {
                        this.client.putObject((PutObjectRequest) PutObjectRequest.builder().bucket(this.bucket).key(this.objectName).contentType(OBJECT_CONTENT_TYPE).build(), RequestBody.fromBytes(byteArrayOutputStream.toByteArray()));
                    } catch (S3Exception e) {
                        throw new SchemaHistoryException("Can not store record to S3", e);
                    }
                } finally {
                }
            } catch (IOException e2) {
                Loggings.logErrorAndTraceRecord(this.logger, historyRecord, "Failed to convert record", e2);
                throw new SchemaHistoryException("Failed to convert record", e2);
            }
        });
    }

    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
        this.lock.write(() -> {
            this.records.forEach(consumer);
        });
    }

    public boolean exists() {
        return !this.records.isEmpty();
    }

    public boolean storageExists() {
        Stream map = this.client.listBuckets().buckets().stream().map((v0) -> {
            return v0.name();
        });
        String str = this.bucket;
        Objects.requireNonNull(str);
        boolean anyMatch = map.anyMatch((v1) -> {
            return r1.equals(v1);
        });
        if (anyMatch) {
            LOGGER.info("Bucket '{}' used to store database history exists", this.bucket);
        } else {
            LOGGER.info("Bucket '{}' used to store database history does not exist yet", this.bucket);
        }
        return anyMatch;
    }

    public void initializeStorage() {
        super.initializeStorage();
        LOGGER.info("Creating S3 bucket '{}' used to store database history", this.bucket);
        this.client.createBucket((CreateBucketRequest) CreateBucketRequest.builder().bucket(this.bucket).build());
    }

    public String toString() {
        return "S3";
    }
}
