package org.apache.james.blob.objectstorage.aws;

import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import com.google.common.io.FileBackedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.util.DataChunker;
import org.apache.james.util.ReactorUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;
import reactor.util.retry.RetryBackoffSpec;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Object;

/* loaded from: input_file:org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.class */
public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
    private static final int CHUNK_SIZE = 1048576;
    private static final int EMPTY_BUCKET_BATCH_SIZE = 1000;
    private static final int FILE_THRESHOLD = 102400;
    private static final Duration FIRST_BACK_OFF = Duration.ofMillis(100);
    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
    private static final boolean LAZY = false;
    private static final int MAX_RETRIES = 5;
    private final InstrumentedPool<S3AsyncClient> clientPool;
    private final BucketNameResolver bucketNameResolver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO$FluxResponse.class */
    public static class FluxResponse {
        final CompletableFuture<FluxResponse> supportingCompletableFuture = new CompletableFuture<>();
        GetObjectResponse sdkResponse;
        Flux<ByteBuffer> flux;

        private FluxResponse() {
        }
    }

    @Inject
    S3BlobStoreDAO(S3BlobStoreConfiguration s3BlobStoreConfiguration) {
        AwsS3AuthConfiguration specificAuthConfiguration = s3BlobStoreConfiguration.getSpecificAuthConfiguration();
        S3Configuration s3Configuration = (S3Configuration) S3Configuration.builder().pathStyleAccessEnabled(true).build();
        this.clientPool = PoolBuilder.from(Mono.fromCallable(() -> {
            return (S3AsyncClient) S3AsyncClient.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(specificAuthConfiguration.getAccessKeyId(), specificAuthConfiguration.getSecretKey()))).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(100).maxPendingConnectionAcquires(10000)).endpointOverride(specificAuthConfiguration.getEndpoint()).region(s3BlobStoreConfiguration.getRegion().asAws()).serviceConfiguration(s3Configuration).build();
        })).acquisitionScheduler(Schedulers.elastic()).destroyHandler(s3AsyncClient -> {
            Objects.requireNonNull(s3AsyncClient);
            return Mono.fromRunnable(s3AsyncClient::close);
        }).maxPendingAcquireUnbounded().sizeUnbounded().fifo();
        this.bucketNameResolver = BucketNameResolver.builder().prefix(s3BlobStoreConfiguration.getBucketPrefix()).namespace(s3BlobStoreConfiguration.getNamespace()).build();
    }

    public void start() {
        this.clientPool.warmup().block();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        this.clientPool.dispose();
    }

    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
        BucketName resolve = this.bucketNameResolver.resolve(bucketName);
        return (InputStream) getObject(resolve, blobId).map(fluxResponse -> {
            return ReactorUtils.toInputStream(fluxResponse.flux);
        }).onErrorMap(NoSuchBucketException.class, noSuchBucketException -> {
            return new ObjectNotFoundException("Bucket not found " + resolve.asString(), noSuchBucketException);
        }).onErrorMap(NoSuchKeyException.class, noSuchKeyException -> {
            return new ObjectNotFoundException("Blob not found " + resolve.asString(), noSuchKeyException);
        }).block();
    }

    private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) {
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.getObject(builder -> {
                    builder.bucket(bucketName.asString()).key(blobId.asString());
                }, new AsyncResponseTransformer<GetObjectResponse, FluxResponse>() { // from class: org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO.1
                    FluxResponse response;

                    public CompletableFuture<FluxResponse> prepare() {
                        this.response = new FluxResponse();
                        return this.response.supportingCompletableFuture;
                    }

                    public void onResponse(GetObjectResponse getObjectResponse) {
                        this.response.sdkResponse = getObjectResponse;
                    }

                    public void exceptionOccurred(Throwable th) {
                        this.response.supportingCompletableFuture.completeExceptionally(th);
                    }

                    public void onStream(SdkPublisher<ByteBuffer> sdkPublisher) {
                        this.response.flux = Flux.from(sdkPublisher);
                        this.response.supportingCompletableFuture.complete(this.response);
                    }
                });
            });
        }).next();
    }

    /* renamed from: readBytes, reason: merged with bridge method [inline-methods] */
    public Mono<byte[]> m6readBytes(BucketName bucketName, BlobId blobId) {
        BucketName resolve = this.bucketNameResolver.resolve(bucketName);
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.getObject(builder -> {
                    builder.bucket(resolve.asString()).key(blobId.asString());
                }, AsyncResponseTransformer.toBytes());
            });
        }).next().onErrorMap(NoSuchBucketException.class, noSuchBucketException -> {
            return new ObjectNotFoundException("Bucket not found " + resolve.asString(), noSuchBucketException);
        }).onErrorMap(NoSuchKeyException.class, noSuchKeyException -> {
            return new ObjectNotFoundException("Blob not found " + resolve.asString(), noSuchKeyException);
        }).map((v0) -> {
            return v0.asByteArray();
        });
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m5save(BucketName bucketName, BlobId blobId, byte[] bArr) {
        BucketName resolve = this.bucketNameResolver.resolve(bucketName);
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.putObject(builder -> {
                    builder.bucket(resolve.asString()).key(blobId.asString()).contentLength(Long.valueOf(bArr.length));
                }, AsyncRequestBody.fromBytes(bArr));
            });
        }).next().retryWhen(createBucketOnRetry(resolve)).then();
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m4save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
        Preconditions.checkNotNull(inputStream);
        return uploadUsingFile(bucketName, blobId, inputStream);
    }

    private Mono<Void> uploadUsingFile(BucketName bucketName, BlobId blobId, InputStream inputStream) {
        return Mono.using(() -> {
            return new FileBackedOutputStream(FILE_THRESHOLD);
        }, fileBackedOutputStream -> {
            return Mono.fromCallable(() -> {
                return Integer.valueOf(IOUtils.copy(inputStream, fileBackedOutputStream));
            }).flatMap(num -> {
                return m3save(bucketName, blobId, fileBackedOutputStream.asByteSource());
            });
        }, Throwing.consumer((v0) -> {
            v0.reset();
        }), false).onErrorMap(IOException.class, iOException -> {
            return new ObjectStoreIOException("Error saving blob", iOException);
        });
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m3save(BucketName bucketName, BlobId blobId, ByteSource byteSource) {
        BucketName resolve = this.bucketNameResolver.resolve(bucketName);
        Objects.requireNonNull(byteSource);
        return Mono.using(byteSource::openStream, inputStream -> {
            return this.clientPool.withPoolable(s3AsyncClient -> {
                return Mono.fromFuture(() -> {
                    return s3AsyncClient.putObject(Throwing.consumer(builder -> {
                        builder.bucket(resolve.asString()).contentLength(Long.valueOf(byteSource.size())).key(blobId.asString());
                    }).sneakyThrow(), AsyncRequestBody.fromPublisher(DataChunker.chunkStream(inputStream, CHUNK_SIZE)));
                });
            }).next();
        }, Throwing.consumer((v0) -> {
            v0.close();
        }), false).retryWhen(createBucketOnRetry(resolve)).onErrorMap(IOException.class, iOException -> {
            return new ObjectStoreIOException("Error saving blob", iOException);
        }).onErrorMap(SdkClientException.class, sdkClientException -> {
            return new ObjectStoreIOException("Error saving blob", sdkClientException);
        }).then();
    }

    private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
        return RetryBackoffSpec.backoff(5L, FIRST_BACK_OFF).maxAttempts(5L).doBeforeRetryAsync(retrySignal -> {
            return retrySignal.failure() instanceof NoSuchBucketException ? this.clientPool.withPoolable(s3AsyncClient -> {
                return Mono.fromFuture(s3AsyncClient.createBucket(builder -> {
                    builder.bucket(bucketName.asString());
                })).onErrorResume(BucketAlreadyOwnedByYouException.class, bucketAlreadyOwnedByYouException -> {
                    return Mono.empty();
                });
            }).next().then() : Mono.error(retrySignal.failure());
        });
    }

    /* renamed from: delete, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m2delete(BucketName bucketName, BlobId blobId) {
        BucketName resolve = this.bucketNameResolver.resolve(bucketName);
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.deleteObject(builder -> {
                    builder.bucket(resolve.asString()).key(blobId.asString());
                });
            });
        }).next().then().onErrorResume(NoSuchBucketException.class, noSuchBucketException -> {
            return Mono.empty();
        });
    }

    /* renamed from: deleteBucket, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m1deleteBucket(BucketName bucketName) {
        return deleteResolvedBucket(this.bucketNameResolver.resolve(bucketName));
    }

    private Mono<Void> deleteResolvedBucket(BucketName bucketName) {
        return emptyBucket(bucketName).onErrorResume(th -> {
            return Mono.just(bucketName);
        }).flatMap(bucketName2 -> {
            return this.clientPool.withPoolable(s3AsyncClient -> {
                return Mono.fromFuture(() -> {
                    return s3AsyncClient.deleteBucket(builder -> {
                        builder.bucket(bucketName.asString());
                    });
                });
            }).next();
        }).onErrorResume(th2 -> {
            return Mono.empty();
        }).then();
    }

    private Mono<BucketName> emptyBucket(BucketName bucketName) {
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.listObjects(builder -> {
                    builder.bucket(bucketName.asString());
                });
            }).flatMapIterable((v0) -> {
                return v0.contents();
            });
        }).window(EMPTY_BUCKET_BATCH_SIZE).flatMap(this::buildListForBatch, 16).flatMap(list -> {
            return deleteObjects(bucketName, list);
        }, 16).then(Mono.just(bucketName));
    }

    private Mono<List<ObjectIdentifier>> buildListForBatch(Flux<S3Object> flux) {
        return flux.map(s3Object -> {
            return (ObjectIdentifier) ObjectIdentifier.builder().key(s3Object.key()).build();
        }).collect(Guavate.toImmutableList());
    }

    private Mono<DeleteObjectsResponse> deleteObjects(BucketName bucketName, List<ObjectIdentifier> list) {
        return this.clientPool.withPoolable(s3AsyncClient -> {
            return Mono.fromFuture(() -> {
                return s3AsyncClient.deleteObjects(builder -> {
                    builder.bucket(bucketName.asString()).delete(builder -> {
                        builder.objects(list);
                    });
                });
            });
        }).next();
    }

    @VisibleForTesting
    public Mono<Void> deleteAllBuckets() {
        return this.clientPool.withPoolable(s3AsyncClient -> {
            Objects.requireNonNull(s3AsyncClient);
            return Mono.fromFuture(s3AsyncClient::listBuckets).flatMapIterable((v0) -> {
                return v0.buckets();
            }).flatMap(bucket -> {
                return deleteResolvedBucket(BucketName.of(bucket.name()));
            }, 16);
        }).then();
    }
}
