package org.apache.james.blob.cassandra;

import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.cassandra.utils.DataChunker;
import org.apache.james.util.ReactorUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/blob/cassandra/CassandraBlobStore.class */
public class CassandraBlobStore implements BlobStore {
    private static final int PREFETCH = 16;
    private static final int MAX_CONCURRENCY = 1;
    private final CassandraDefaultBucketDAO defaultBucketDAO;
    private final CassandraBucketDAO bucketDAO;
    private final DataChunker dataChunker;
    private final CassandraConfiguration configuration;
    private final HashBlobId.Factory blobIdFactory;

    @Inject
    CassandraBlobStore(CassandraDefaultBucketDAO cassandraDefaultBucketDAO, CassandraBucketDAO cassandraBucketDAO, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory factory) {
        this.defaultBucketDAO = cassandraDefaultBucketDAO;
        this.bucketDAO = cassandraBucketDAO;
        this.configuration = cassandraConfiguration;
        this.blobIdFactory = factory;
        this.dataChunker = new DataChunker();
    }

    @VisibleForTesting
    public CassandraBlobStore(Session session) {
        this(new CassandraDefaultBucketDAO(session), new CassandraBucketDAO(new HashBlobId.Factory(), session), CassandraConfiguration.DEFAULT_CONFIGURATION, new HashBlobId.Factory());
    }

    public Mono<BlobId> save(BucketName bucketName, byte[] bArr) {
        Preconditions.checkNotNull(bArr);
        return saveAsMono(bucketName, bArr);
    }

    private Mono<BlobId> saveAsMono(BucketName bucketName, byte[] bArr) {
        HashBlobId forPayload = this.blobIdFactory.forPayload(bArr);
        return saveBlobParts(bucketName, bArr, forPayload).flatMap(num -> {
            return saveBlobPartReference(bucketName, forPayload, num).then(Mono.just(forPayload));
        });
    }

    private Mono<Integer> saveBlobParts(BucketName bucketName, byte[] bArr, BlobId blobId) {
        return Flux.fromStream(this.dataChunker.chunk(bArr, this.configuration.getBlobPartSize())).publishOn(Schedulers.elastic(), PREFETCH).flatMap(pair -> {
            return writePart(bucketName, blobId, ((Integer) pair.getKey()).intValue(), (ByteBuffer) pair.getValue()).then(Mono.just(getChunkNum(pair)));
        }).collect(Collectors.maxBy(Comparator.comparingInt(num -> {
            return num.intValue();
        }))).flatMap(Mono::justOrEmpty).map((v1) -> {
            return numToCount(v1);
        }).defaultIfEmpty(0);
    }

    private int numToCount(int i) {
        return i + MAX_CONCURRENCY;
    }

    private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
        return (Integer) pair.getKey();
    }

    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
        return readBlobParts(bucketName, blobId).collectList().map(this::byteBuffersToBytesArray);
    }

    public InputStream read(BucketName bucketName, BlobId blobId) {
        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
    }

    public BucketName getDefaultBucketName() {
        return BucketName.DEFAULT;
    }

    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
        return Flux.range(0, ((Integer) selectRowCount(bucketName, blobId).publishOn(Schedulers.elastic()).single().onErrorResume(NoSuchElementException.class, noSuchElementException -> {
            return Mono.error(new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
        }).block()).intValue()).publishOn(Schedulers.elastic(), PREFETCH).flatMapSequential(num -> {
            return readPart(bucketName, blobId, num).single().onErrorResume(NoSuchElementException.class, noSuchElementException2 -> {
                return Mono.error(new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId, num)));
            });
        }, MAX_CONCURRENCY, PREFETCH);
    }

    public Mono<BlobId> save(BucketName bucketName, InputStream inputStream) {
        Preconditions.checkNotNull(inputStream);
        return Mono.fromCallable(() -> {
            return IOUtils.toByteArray(inputStream);
        }).flatMap(bArr -> {
            return saveAsMono(bucketName, bArr);
        });
    }

    public Mono<Void> deleteBucket(BucketName bucketName) {
        Preconditions.checkNotNull(bucketName);
        Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden");
        return this.bucketDAO.listAll().filter(pair -> {
            return ((BucketName) pair.getKey()).equals(bucketName);
        }).map((v0) -> {
            return v0.getValue();
        }).flatMap(blobId -> {
            return delete(bucketName, blobId);
        }).then();
    }

    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.deletePosition(blobId).then(this.defaultBucketDAO.deleteParts(blobId)) : this.bucketDAO.deletePosition(bucketName, blobId).then(this.bucketDAO.deleteParts(bucketName, blobId));
    }

    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer num) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.readPart(blobId, num.intValue()) : this.bucketDAO.readPart(bucketName, blobId, num.intValue());
    }

    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.selectRowCount(blobId) : this.bucketDAO.selectRowCount(bucketName, blobId);
    }

    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer num) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.saveBlobPartsReferences(blobId, num.intValue()) : this.bucketDAO.saveBlobPartsReferences(bucketName, blobId, num.intValue());
    }

    private Mono<Void> writePart(BucketName bucketName, BlobId blobId, int i, ByteBuffer byteBuffer) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.writePart(byteBuffer, blobId, i) : this.bucketDAO.writePart(byteBuffer, bucketName, blobId, i);
    }

    private boolean isDefaultBucket(BucketName bucketName) {
        return bucketName.equals(getDefaultBucketName());
    }

    private byte[] byteBuffersToBytesArray(List<ByteBuffer> list) {
        return list.stream().reduce(ByteBuffer.allocate(list.stream().mapToInt((v0) -> {
            return v0.remaining();
        }).sum()), (byteBuffer, byteBuffer2) -> {
            return byteBuffer.put(byteBuffer2);
        }).array();
    }
}
