package io.camunda.zeebe.backup.s3;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.camunda.zeebe.backup.api.Backup;
import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupIdentifierWildcard;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.api.NamedFileSet;
import io.camunda.zeebe.backup.common.BackupIdentifierImpl;
import io.camunda.zeebe.backup.common.BackupImpl;
import io.camunda.zeebe.backup.common.NamedFileSetImpl;
import io.camunda.zeebe.backup.s3.S3BackupStoreException;
import io.camunda.zeebe.backup.s3.manifest.Manifest;
import io.camunda.zeebe.backup.s3.manifest.NoBackupManifest;
import io.camunda.zeebe.backup.s3.manifest.ValidBackupManifest;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

/* loaded from: input_file:io/camunda/zeebe/backup/s3/S3BackupStore.class */
public final class S3BackupStore implements BackupStore {
    static final String SNAPSHOT_PREFIX = "snapshot/";
    static final String SEGMENTS_PREFIX = "segments/";
    static final String MANIFEST_OBJECT_KEY = "manifest.json";
    private final S3BackupConfig config;
    private final S3AsyncClient client;
    static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new Jdk8Module()).registerModule(new JavaTimeModule());
    private static final Logger LOG = LoggerFactory.getLogger(S3BackupStore.class);
    private static final Pattern BACKUP_IDENTIFIER_PATTERN = Pattern.compile("^(?<partitionId>\\d+)/(?<checkpointId>\\d+)/(?<nodeId>\\d+).*");

    public S3BackupStore(S3BackupConfig s3BackupConfig) {
        this(s3BackupConfig, buildClient(s3BackupConfig));
    }

    public S3BackupStore(S3BackupConfig s3BackupConfig, S3AsyncClient s3AsyncClient) {
        this.config = s3BackupConfig;
        this.client = s3AsyncClient;
    }

    private static Optional<BackupIdentifier> tryParseKeyAsId(String str) {
        Matcher matcher = BACKUP_IDENTIFIER_PATTERN.matcher(str);
        if (matcher.matches()) {
            try {
                return Optional.of(new BackupIdentifierImpl(Integer.parseInt(matcher.group("nodeId")), Integer.parseInt(matcher.group("partitionId")), Long.parseLong(matcher.group("checkpointId"))));
            } catch (NumberFormatException e) {
                LOG.warn("Tried interpreting key {} as a BackupIdentifier but failed", str, e);
            }
        }
        return Optional.empty();
    }

    private static String wildcardPrefix(BackupIdentifierWildcard backupIdentifierWildcard) {
        return (String) Stream.of((Object[]) new Optional[]{backupIdentifierWildcard.partitionId(), backupIdentifierWildcard.checkpointId(), backupIdentifierWildcard.nodeId()}).takeWhile((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).map(obj -> {
            return obj.toString();
        }).collect(Collectors.joining("/"));
    }

    public static String objectPrefix(BackupIdentifier backupIdentifier) {
        return "%s/%s/%s/".formatted(Integer.valueOf(backupIdentifier.partitionId()), Long.valueOf(backupIdentifier.checkpointId()), Integer.valueOf(backupIdentifier.nodeId()));
    }

    public static void validateConfig(S3BackupConfig s3BackupConfig) {
        if (s3BackupConfig.bucketName() == null || s3BackupConfig.bucketName().isEmpty()) {
            throw new IllegalArgumentException("Configuration for S3 backup store is incomplete. bucketName must not be empty.");
        }
        if (s3BackupConfig.region().isEmpty()) {
            LOG.warn("No region configured for S3 backup store. Region will be determined from environment (see https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html#automatically-determine-the-aws-region-from-the-environment)");
        }
        if (s3BackupConfig.endpoint().isEmpty()) {
            LOG.warn("No endpoint configured for S3 backup store. Endpoint will be determined from the region");
        }
        if (s3BackupConfig.credentials().isEmpty()) {
            LOG.warn("Access credentials (accessKey, secretKey) not configured for S3 backup store. Credentials will be determined from environment (see https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html#credentials-chain)");
        }
        buildClient(s3BackupConfig).close();
    }

    public CompletableFuture<Void> save(Backup backup) {
        LOG.info("Saving {}", backup.id());
        return updateManifestObject(backup.id(), Manifest::expectNoBackup, noBackupManifest -> {
            return noBackupManifest.asInProgress(backup);
        }).thenComposeAsync(validBackupManifest -> {
            return CompletableFuture.allOf(saveSnapshotFiles(backup), saveSegmentFiles(backup)).thenComposeAsync(r7 -> {
                return updateManifestObject(backup.id(), Manifest::expectInProgress, (v0) -> {
                    return v0.asCompleted();
                });
            }).exceptionallyComposeAsync((Function<Throwable, ? extends CompletionStage<U>>) th -> {
                return updateManifestObject(backup.id(), manifest -> {
                    return manifest.asFailed(th);
                }).thenCompose(validBackupManifest -> {
                    return CompletableFuture.failedStage(th);
                });
            });
        }).thenApply((Function<? super U, ? extends U>) validBackupManifest2 -> {
            return null;
        });
    }

    public CompletableFuture<BackupStatus> getStatus(BackupIdentifier backupIdentifier) {
        LOG.info("Querying status of {}", backupIdentifier);
        return readManifestObject(backupIdentifier).thenApply((v0) -> {
            return v0.toStatus();
        });
    }

    public CompletableFuture<Collection<BackupStatus>> list(BackupIdentifierWildcard backupIdentifierWildcard) {
        LOG.info("Querying status of {}", backupIdentifierWildcard);
        return readManifestObjects(backupIdentifierWildcard).thenApplyAsync(collection -> {
            return collection.stream().map((v0) -> {
                return v0.toStatus();
            }).toList();
        });
    }

    public CompletableFuture<Void> delete(BackupIdentifier backupIdentifier) {
        LOG.info("Deleting {}", backupIdentifier);
        return readManifestObject(backupIdentifier).thenApply(manifest -> {
            if (manifest.statusCode() == BackupStatusCode.IN_PROGRESS) {
                throw new S3BackupStoreException.BackupInInvalidStateException("Can't delete in-progress backup %s, must be marked as failed first".formatted(manifest.mo2id()));
            }
            return manifest.mo2id();
        }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) this::listBackupObjects).thenComposeAsync((v1) -> {
            return deleteBackupObjects(v1);
        });
    }

    public CompletableFuture<Backup> restore(BackupIdentifier backupIdentifier, Path path) {
        LOG.info("Restoring {} to {}", backupIdentifier, path);
        String objectPrefix = objectPrefix(backupIdentifier);
        return readManifestObject(backupIdentifier).thenApply(Manifest::expectCompleted).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) completedBackupManifest -> {
            return downloadNamedFileSet(objectPrefix + "segments/", completedBackupManifest.segmentFileNames(), path).thenCombineAsync((CompletionStage) downloadNamedFileSet(objectPrefix + "snapshot/", completedBackupManifest.snapshotFileNames(), path), (namedFileSet, namedFileSet2) -> {
                return new BackupImpl(backupIdentifier, completedBackupManifest.descriptor(), namedFileSet2, namedFileSet);
            });
        });
    }

    public CompletableFuture<BackupStatusCode> markFailed(BackupIdentifier backupIdentifier, String str) {
        LOG.info("Marking {} as failed", backupIdentifier);
        return updateManifestObject(backupIdentifier, manifest -> {
            return manifest.asFailed(str);
        }).thenApply((v0) -> {
            return v0.statusCode();
        });
    }

    public CompletableFuture<Void> closeAsync() {
        this.client.close();
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<NamedFileSet> downloadNamedFileSet(String str, Set<String> set, Path path) {
        LOG.debug("Downloading {} files from prefix {} to {}", new Object[]{Integer.valueOf(set.size()), str, path});
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        return CompletableFuture.allOf((CompletableFuture[]) set.stream().map(str2 -> {
            Path resolve = path.resolve(str2);
            return this.client.getObject(builder -> {
                builder.bucket(this.config.bucketName()).key(str + str2);
            }, resolve).thenApply(getObjectResponse -> {
                return (Path) concurrentHashMap.put(str2, resolve);
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(r5 -> {
            return new NamedFileSetImpl(concurrentHashMap);
        });
    }

    private CompletableFuture<List<ObjectIdentifier>> listBackupObjects(BackupIdentifier backupIdentifier) {
        LOG.debug("Listing objects of {}", backupIdentifier);
        return this.client.listObjectsV2(builder -> {
            builder.bucket(this.config.bucketName()).prefix(objectPrefix(backupIdentifier));
        }).thenApplyAsync(listObjectsV2Response -> {
            return listObjectsV2Response.contents().stream().map((v0) -> {
                return v0.key();
            }).map(str -> {
                return (ObjectIdentifier) ObjectIdentifier.builder().key(str).build();
            }).toList();
        });
    }

    private CompletableFuture<Void> deleteBackupObjects(Collection<ObjectIdentifier> collection) {
        LOG.debug("Deleting {} objects", Integer.valueOf(collection.size()));
        return collection.isEmpty() ? CompletableFuture.completedFuture(null) : this.client.deleteObjects(builder -> {
            builder.bucket(this.config.bucketName()).delete(builder -> {
                builder.objects(collection).quiet(true);
            });
        }).thenApplyAsync(deleteObjectsResponse -> {
            if (deleteObjectsResponse.errors().isEmpty()) {
                return null;
            }
            throw new S3BackupStoreException.BackupDeletionIncomplete("Not all objects belonging to the backup were deleted successfully: " + deleteObjectsResponse.errors());
        });
    }

    private SdkPublisher<BackupIdentifier> findBackupIds(BackupIdentifierWildcard backupIdentifierWildcard) {
        String wildcardPrefix = wildcardPrefix(backupIdentifierWildcard);
        LOG.debug("Using prefix {} to search for manifest files matching {}", wildcardPrefix, backupIdentifierWildcard);
        SdkPublisher map = this.client.listObjectsV2Paginator(builder -> {
            builder.bucket(this.config.bucketName()).prefix(wildcardPrefix);
        }).contents().filter(s3Object -> {
            return s3Object.key().endsWith(MANIFEST_OBJECT_KEY);
        }).map((v0) -> {
            return v0.key();
        }).map(S3BackupStore::tryParseKeyAsId).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        });
        Objects.requireNonNull(backupIdentifierWildcard);
        return map.filter(backupIdentifierWildcard::matches);
    }

    private CompletableFuture<Collection<Manifest>> readManifestObjects(BackupIdentifierWildcard backupIdentifierWildcard) {
        AsyncAggregatingSubscriber asyncAggregatingSubscriber = new AsyncAggregatingSubscriber(16L);
        findBackupIds(backupIdentifierWildcard).map(this::readManifestObject).subscribe(asyncAggregatingSubscriber);
        return asyncAggregatingSubscriber.result();
    }

    CompletableFuture<Manifest> readManifestObject(BackupIdentifier backupIdentifier) {
        LOG.debug("Reading manifest object of {}", backupIdentifier);
        return this.client.getObject(builder -> {
            builder.bucket(this.config.bucketName()).key(objectPrefix(backupIdentifier) + "manifest.json");
        }, AsyncResponseTransformer.toBytes()).thenApply(responseBytes -> {
            try {
                return (Manifest) MAPPER.readValue(responseBytes.asInputStream(), ValidBackupManifest.class);
            } catch (JsonParseException e) {
                throw new S3BackupStoreException.ManifestParseException("Failed to parse manifest object", e);
            } catch (IOException e2) {
                throw new S3BackupStoreException.BackupReadException("Failed to read manifest object", e2);
            }
        }).exceptionally(th -> {
            if (th.getCause() instanceof NoSuchKeyException) {
                LOG.debug("Found no manifest for backup {}", backupIdentifier);
                return new NoBackupManifest(BackupIdentifierImpl.from(backupIdentifier));
            }
            Throwable cause = th.getCause();
            if (cause instanceof S3BackupStoreException) {
                throw ((S3BackupStoreException) cause);
            }
            throw new S3BackupStoreException.BackupReadException("Failed to read manifest of %s".formatted(backupIdentifier), th);
        });
    }

    <T> CompletableFuture<ValidBackupManifest> updateManifestObject(BackupIdentifier backupIdentifier, Function<Manifest, T> function, Function<T, ValidBackupManifest> function2) {
        return updateManifestObject(backupIdentifier, manifest -> {
            return (ValidBackupManifest) function2.apply(function.apply(manifest));
        });
    }

    CompletableFuture<ValidBackupManifest> updateManifestObject(BackupIdentifier backupIdentifier, Function<Manifest, ValidBackupManifest> function) {
        return readManifestObject(backupIdentifier).thenApply((Function<? super Manifest, ? extends U>) function).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) this::writeManifestObject);
    }

    CompletableFuture<ValidBackupManifest> writeManifestObject(ValidBackupManifest validBackupManifest) {
        LOG.debug("Updating manifest of {} to {}", validBackupManifest.mo2id(), validBackupManifest);
        try {
            return this.client.putObject(builder -> {
                builder.bucket(this.config.bucketName()).key(objectPrefix(validBackupManifest.mo2id()) + "manifest.json").build();
            }, AsyncRequestBody.fromBytes(MAPPER.writeValueAsBytes(validBackupManifest))).thenApply(putObjectResponse -> {
                return validBackupManifest;
            });
        } catch (JsonProcessingException e) {
            return CompletableFuture.failedFuture(e);
        }
    }

    private CompletableFuture<Void> saveSnapshotFiles(Backup backup) {
        LOG.debug("Saving snapshot files for {}", backup.id());
        String str = objectPrefix(backup.id()) + "snapshot/";
        return CompletableFuture.allOf((CompletableFuture[]) backup.snapshot().namedFiles().entrySet().stream().map(entry -> {
            return saveNamedFile(str, (String) entry.getKey(), (Path) entry.getValue());
        }).toList().toArray(new CompletableFuture[0]));
    }

    private CompletableFuture<Void> saveSegmentFiles(Backup backup) {
        LOG.debug("Saving segment files for {}", backup.id());
        String str = objectPrefix(backup.id()) + "segments/";
        return CompletableFuture.allOf((CompletableFuture[]) backup.segments().namedFiles().entrySet().stream().map(entry -> {
            return saveNamedFile(str, (String) entry.getKey(), (Path) entry.getValue());
        }).toList().toArray(new CompletableFuture[0]));
    }

    private CompletableFuture<PutObjectResponse> saveNamedFile(String str, String str2, Path path) {
        LOG.trace("Saving file {}({}) in prefix {}", new Object[]{str2, path, str});
        return this.client.putObject(builder -> {
            builder.bucket(this.config.bucketName()).key(str + str2);
        }, AsyncRequestBody.fromFile(path));
    }

    public static S3AsyncClient buildClient(S3BackupConfig s3BackupConfig) {
        S3AsyncClientBuilder builder = S3AsyncClient.builder();
        builder.defaultsMode(DefaultsMode.AUTO);
        builder.httpClient(NettyNioAsyncHttpClient.builder().connectionAcquisitionTimeout(Duration.ofSeconds(45L)).build());
        builder.overrideConfiguration(builder2 -> {
            builder2.retryPolicy(RetryMode.ADAPTIVE);
        });
        s3BackupConfig.endpoint().ifPresent(str -> {
            builder.endpointOverride(URI.create(str));
        });
        s3BackupConfig.region().ifPresent(str2 -> {
            builder.region(Region.of(str2));
        });
        s3BackupConfig.credentials().ifPresent(credentials -> {
            builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(credentials.accessKey(), credentials.secretKey())));
        });
        s3BackupConfig.apiCallTimeout().ifPresent(duration -> {
            builder.overrideConfiguration(builder3 -> {
                builder3.apiCallTimeout(duration);
            });
        });
        return (S3AsyncClient) builder.build();
    }
}
