package org.apache.james.blob.cassandra;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import javax.inject.Inject;
import javax.inject.Named;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.DataChunker;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/blob/cassandra/CassandraBlobStoreDAO.class */
public class CassandraBlobStoreDAO implements BlobStoreDAO {
    public static final boolean LAZY = false;
    public static final String CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME = "cassandraBlobStoreClOneMisses";
    public static final String CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME = "cassandraBlobStoreClOneHits";
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobStoreDAO.class);
    private final CassandraDefaultBucketDAO defaultBucketDAO;
    private final CassandraBucketDAO bucketDAO;
    private final CassandraConfiguration configuration;
    private final BucketName defaultBucket;
    private final Metric metricClOneHitCount;
    private final Metric metricClOneMissCount;

    @Inject
    @VisibleForTesting
    public CassandraBlobStoreDAO(CassandraDefaultBucketDAO cassandraDefaultBucketDAO, CassandraBucketDAO cassandraBucketDAO, CassandraConfiguration cassandraConfiguration, @Named("defaultBucket") BucketName bucketName, MetricFactory metricFactory) {
        this.defaultBucketDAO = cassandraDefaultBucketDAO;
        this.bucketDAO = cassandraBucketDAO;
        this.configuration = cassandraConfiguration;
        this.defaultBucket = bucketName;
        this.metricClOneMissCount = metricFactory.generate(CASSANDRA_BLOBSTORE_CL_ONE_MISS_COUNT_METRIC_NAME);
        this.metricClOneHitCount = metricFactory.generate(CASSANDRA_BLOBSTORE_CL_ONE_HIT_COUNT_METRIC_NAME);
        if (Objects.equals(System.getenv("cassandra.blob.store.disable.startup.warning"), "false")) {
            LOGGER.warn("WARNING: JAMES-3591 Cassandra is not made to store large binary content, its use will be suboptimal compared to  alternatives (namely S3 compatible BlobStores backed by for instance S3, MinIO or Ozone)");
        }
    }

    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
    }

    public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId) {
        return Mono.just(read(bucketName, blobId));
    }

    /* renamed from: readBytes, reason: merged with bridge method [inline-methods] */
    public Mono<byte[]> m12readBytes(BucketName bucketName, BlobId blobId) {
        return readBlobParts(bucketName, blobId).collectList().map(this::byteBuffersToBytesArray);
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m11save(BucketName bucketName, BlobId blobId, byte[] bArr) {
        Preconditions.checkNotNull(bArr);
        return Mono.fromCallable(() -> {
            return DataChunker.chunk(bArr, this.configuration.getBlobPartSize());
        }).flatMap(flux -> {
            return save(bucketName, blobId, (Flux<ByteBuffer>) flux);
        });
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m10save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
        Preconditions.checkNotNull(bucketName);
        Preconditions.checkNotNull(inputStream);
        return Mono.fromCallable(() -> {
            return ReactorUtils.toChunks(inputStream, this.configuration.getBlobPartSize()).subscribeOn(Schedulers.boundedElastic());
        }).flatMap(flux -> {
            return save(bucketName, blobId, (Flux<ByteBuffer>) flux);
        }).onErrorMap(th -> {
            return new ObjectStoreIOException("Exception occurred while saving input stream", th);
        });
    }

    /* renamed from: save, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m9save(BucketName bucketName, BlobId blobId, ByteSource byteSource) {
        Objects.requireNonNull(byteSource);
        return Mono.using(byteSource::openBufferedStream, inputStream -> {
            return m10save(bucketName, blobId, inputStream);
        }, Throwing.consumer((v0) -> {
            v0.close();
        }).sneakyThrow(), false);
    }

    private Mono<Void> save(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> flux) {
        return saveBlobParts(bucketName, blobId, flux).flatMap(num -> {
            return saveBlobPartReference(bucketName, blobId, num);
        });
    }

    private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> flux) {
        return flux.index().concatMap(tuple2 -> {
            return writePart(bucketName, blobId, ((Long) tuple2.getT1()).intValue(), (ByteBuffer) tuple2.getT2());
        }).count().map((v0) -> {
            return v0.intValue();
        });
    }

    private Mono<?> writePart(BucketName bucketName, BlobId blobId, int i, ByteBuffer byteBuffer) {
        return (isDefaultBucket(bucketName) ? this.defaultBucketDAO.writePart(byteBuffer, blobId, i) : this.bucketDAO.writePart(byteBuffer, bucketName, blobId, i)).thenReturn(1);
    }

    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer num) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.saveBlobPartsReferences(blobId, num.intValue()) : this.bucketDAO.saveBlobPartsReferences(bucketName, blobId, num.intValue());
    }

    private boolean isDefaultBucket(BucketName bucketName) {
        return bucketName.equals(this.defaultBucket);
    }

    /* renamed from: delete, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m8delete(BucketName bucketName, BlobId blobId) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.deletePosition(blobId).then(this.defaultBucketDAO.deleteParts(blobId)) : this.bucketDAO.deletePosition(bucketName, blobId).then(this.bucketDAO.deleteParts(bucketName, blobId));
    }

    public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> collection) {
        return Flux.fromIterable(collection).flatMap(blobId -> {
            return m8delete(bucketName, blobId);
        }, 16).then();
    }

    /* renamed from: deleteBucket, reason: merged with bridge method [inline-methods] */
    public Mono<Void> m7deleteBucket(BucketName bucketName) {
        Preconditions.checkNotNull(bucketName);
        Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden");
        return this.bucketDAO.listAll().filter(pair -> {
            return ((BucketName) pair.getKey()).equals(bucketName);
        }).map((v0) -> {
            return v0.getValue();
        }).flatMap(blobId -> {
            return m8delete(bucketName, blobId);
        }, 16).then();
    }

    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer num) {
        if (!this.configuration.isOptimisticConsistencyLevel()) {
            return readPartClDefault(bucketName, blobId, num);
        }
        Mono doOnNext = readPartClOne(bucketName, blobId, num).doOnNext(byteBuffer -> {
            this.metricClOneHitCount.increment();
        });
        Metric metric = this.metricClOneMissCount;
        Objects.requireNonNull(metric);
        return doOnNext.switchIfEmpty(Mono.fromRunnable(metric::increment).then(readPartClDefault(bucketName, blobId, num)));
    }

    private Mono<ByteBuffer> readPartClOne(BucketName bucketName, BlobId blobId, Integer num) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.readPartClOne(blobId, num.intValue()) : this.bucketDAO.readPartClOne(bucketName, blobId, num.intValue());
    }

    private Mono<ByteBuffer> readPartClDefault(BucketName bucketName, BlobId blobId, Integer num) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.readPart(blobId, num.intValue()) : this.bucketDAO.readPart(bucketName, blobId, num.intValue());
    }

    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
        if (!this.configuration.isOptimisticConsistencyLevel()) {
            return selectRowCountClDefault(bucketName, blobId);
        }
        Mono doOnNext = selectRowCountClOne(bucketName, blobId).doOnNext(num -> {
            this.metricClOneHitCount.increment();
        });
        Metric metric = this.metricClOneMissCount;
        Objects.requireNonNull(metric);
        return doOnNext.switchIfEmpty(Mono.fromRunnable(metric::increment).then(selectRowCountClDefault(bucketName, blobId)));
    }

    private Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.selectRowCountClOne(blobId) : this.bucketDAO.selectRowCountClOne(bucketName, blobId);
    }

    private Mono<Integer> selectRowCountClDefault(BucketName bucketName, BlobId blobId) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.selectRowCount(blobId) : this.bucketDAO.selectRowCount(bucketName, blobId);
    }

    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
        return selectRowCount(bucketName, blobId).single().onErrorMap(NoSuchElementException.class, noSuchElementException -> {
            return new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId));
        }).flatMapMany(num -> {
            return Flux.range(0, num.intValue()).concatMap(num -> {
                return readPart(bucketName, blobId, num).single().onErrorMap(NoSuchElementException.class, noSuchElementException2 -> {
                    return new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId.asString(), num));
                });
            });
        });
    }

    private byte[] byteBuffersToBytesArray(List<ByteBuffer> list) {
        return list.stream().reduce(ByteBuffer.allocate(list.stream().mapToInt((v0) -> {
            return v0.remaining();
        }).sum()), (v0, v1) -> {
            return v0.put(v1);
        }).array();
    }

    public Publisher<BucketName> listBuckets() {
        return this.bucketDAO.listAll().map((v0) -> {
            return v0.getLeft();
        }).distinct();
    }

    public Publisher<BlobId> listBlobs(BucketName bucketName) {
        return isDefaultBucket(bucketName) ? this.defaultBucketDAO.listBlobs() : this.bucketDAO.listAll(bucketName);
    }
}
