package org.apache.james.blob.cassandra;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Bytes;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.ObjectStoreException;
import org.apache.james.blob.cassandra.BlobTable;
import org.apache.james.blob.cassandra.utils.DataChunker;
import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/apache/james/blob/cassandra/CassandraBlobsDAO.class */
public class CassandraBlobsDAO implements BlobStore {
    private static final int PREFETCH = 16;
    private static final int MAX_CONCURRENCY = 2;
    private final CassandraAsyncExecutor cassandraAsyncExecutor;
    private final PreparedStatement insert;
    private final PreparedStatement insertPart;
    private final PreparedStatement select;
    private final PreparedStatement selectPart;
    private final DataChunker dataChunker;
    private final CassandraConfiguration configuration;
    private final HashBlobId.Factory blobIdFactory;

    @Inject
    public CassandraBlobsDAO(Session session, CassandraConfiguration cassandraConfiguration, HashBlobId.Factory factory) {
        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
        this.configuration = cassandraConfiguration;
        this.blobIdFactory = factory;
        this.dataChunker = new DataChunker();
        this.insert = prepareInsert(session);
        this.select = prepareSelect(session);
        this.insertPart = prepareInsertPart(session);
        this.selectPart = prepareSelectPart(session);
    }

    @VisibleForTesting
    public CassandraBlobsDAO(Session session) {
        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION, new HashBlobId.Factory());
    }

    private PreparedStatement prepareSelect(Session session) {
        return session.prepare(QueryBuilder.select().from(BlobTable.TABLE_NAME).where(QueryBuilder.eq(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID))));
    }

    private PreparedStatement prepareSelectPart(Session session) {
        return session.prepare(QueryBuilder.select().from(BlobTable.BlobParts.TABLE_NAME).where(QueryBuilder.eq(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID))).and(QueryBuilder.eq(BlobTable.BlobParts.CHUNK_NUMBER, QueryBuilder.bindMarker(BlobTable.BlobParts.CHUNK_NUMBER))));
    }

    private PreparedStatement prepareInsert(Session session) {
        return session.prepare(QueryBuilder.insertInto(BlobTable.TABLE_NAME).value(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID)).value(BlobTable.NUMBER_OF_CHUNK, QueryBuilder.bindMarker(BlobTable.NUMBER_OF_CHUNK)));
    }

    private PreparedStatement prepareInsertPart(Session session) {
        return session.prepare(QueryBuilder.insertInto(BlobTable.BlobParts.TABLE_NAME).value(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID)).value(BlobTable.BlobParts.CHUNK_NUMBER, QueryBuilder.bindMarker(BlobTable.BlobParts.CHUNK_NUMBER)).value(BlobTable.BlobParts.DATA, QueryBuilder.bindMarker(BlobTable.BlobParts.DATA)));
    }

    public Mono<BlobId> save(byte[] bArr) {
        Preconditions.checkNotNull(bArr);
        return saveAsMono(bArr);
    }

    private Mono<BlobId> saveAsMono(byte[] bArr) {
        HashBlobId forPayload = this.blobIdFactory.forPayload(bArr);
        return saveBlobParts(bArr, forPayload).flatMap(num -> {
            return saveBlobPartsReferences(forPayload, num.intValue());
        });
    }

    private Mono<Integer> saveBlobParts(byte[] bArr, BlobId blobId) {
        return Flux.fromStream(this.dataChunker.chunk(bArr, this.configuration.getBlobPartSize())).publishOn(Schedulers.elastic(), PREFETCH).flatMap(pair -> {
            return writePart((ByteBuffer) pair.getValue(), blobId, getChunkNum(pair).intValue());
        }).collect(Collectors.maxBy(Comparator.comparingInt(num -> {
            return num.intValue();
        }))).flatMap(Mono::justOrEmpty).map((v1) -> {
            return numToCount(v1);
        }).defaultIfEmpty(0);
    }

    private int numToCount(int i) {
        return i + 1;
    }

    private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
        return (Integer) pair.getKey();
    }

    private Mono<Integer> writePart(ByteBuffer byteBuffer, BlobId blobId, int i) {
        return this.cassandraAsyncExecutor.executeVoidReactor(this.insertPart.bind().setString(BlobTable.ID, blobId.asString()).setInt(BlobTable.BlobParts.CHUNK_NUMBER, i).setBytes(BlobTable.BlobParts.DATA, byteBuffer)).then(Mono.just(Integer.valueOf(i)));
    }

    private Mono<BlobId> saveBlobPartsReferences(BlobId blobId, int i) {
        return this.cassandraAsyncExecutor.executeVoidReactor(this.insert.bind().setString(BlobTable.ID, blobId.asString()).setInt(BlobTable.NUMBER_OF_CHUNK, i)).then(Mono.just(blobId));
    }

    public Mono<byte[]> readBytes(BlobId blobId) {
        return readBlobParts(blobId).collectList().map(list -> {
            return Bytes.concat((byte[][]) list.toArray((Object[]) new byte[0]));
        });
    }

    private Mono<Integer> selectRowCount(BlobId blobId) {
        return this.cassandraAsyncExecutor.executeSingleRowReactor(this.select.bind().setString(BlobTable.ID, blobId.asString())).map(row -> {
            return Integer.valueOf(row.getInt(BlobTable.NUMBER_OF_CHUNK));
        });
    }

    private byte[] rowToData(Row row) {
        byte[] bArr = new byte[row.getBytes(BlobTable.BlobParts.DATA).remaining()];
        row.getBytes(BlobTable.BlobParts.DATA).get(bArr);
        return bArr;
    }

    private Mono<byte[]> readPart(BlobId blobId, int i) {
        return this.cassandraAsyncExecutor.executeSingleRowReactor(this.selectPart.bind().setString(BlobTable.ID, blobId.asString()).setInt(BlobTable.BlobParts.CHUNK_NUMBER, i)).map(this::rowToData).switchIfEmpty(Mono.error(new IllegalStateException(String.format("Missing blob part for blobId %s and position %d", blobId, Integer.valueOf(i)))));
    }

    public InputStream read(BlobId blobId) {
        PipedInputStream pipedInputStream = new PipedInputStream();
        readBlobParts(blobId).subscribe(new PipedStreamSubscriber(pipedInputStream));
        return pipedInputStream;
    }

    private Flux<byte[]> readBlobParts(BlobId blobId) {
        return Flux.range(0, ((Integer) selectRowCount(blobId).publishOn(Schedulers.elastic()).switchIfEmpty(Mono.error(new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId)))).block()).intValue()).publishOn(Schedulers.elastic(), PREFETCH).flatMapSequential(num -> {
            return readPart(blobId, num.intValue());
        }, MAX_CONCURRENCY, PREFETCH);
    }

    public Mono<BlobId> save(InputStream inputStream) {
        Preconditions.checkNotNull(inputStream);
        return Mono.fromCallable(() -> {
            return IOUtils.toByteArray(inputStream);
        }).flatMap(this::saveAsMono);
    }
}
