package org.apache.james.blob.cassandra;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.ObjectStore;
import org.apache.james.blob.cassandra.BlobTable;
import org.apache.james.blob.cassandra.CassandraBlobId;
import org.apache.james.blob.cassandra.utils.DataChunker;
import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.OptionalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/james/blob/cassandra/CassandraBlobsDAO.class */
public class CassandraBlobsDAO implements ObjectStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class);
    private final CassandraAsyncExecutor cassandraAsyncExecutor;
    private final PreparedStatement insert;
    private final PreparedStatement insertPart;
    private final PreparedStatement select;
    private final PreparedStatement selectPart;
    private final DataChunker dataChunker;
    private final CassandraConfiguration configuration;
    private final CassandraBlobId.Factory blobIdFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/james/blob/cassandra/CassandraBlobsDAO$BlobPart.class */
    public static class BlobPart {
        private final BlobId blobId;
        private final int position;
        private final Optional<Row> row;

        public BlobPart(BlobId blobId, int i, Optional<Row> optional) {
            Preconditions.checkNotNull(blobId);
            Preconditions.checkArgument(i >= 0, "position need to be positive");
            this.blobId = blobId;
            this.position = i;
            this.row = optional;
        }
    }

    @Inject
    public CassandraBlobsDAO(Session session, CassandraConfiguration cassandraConfiguration, CassandraBlobId.Factory factory) {
        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
        this.configuration = cassandraConfiguration;
        this.blobIdFactory = factory;
        this.dataChunker = new DataChunker();
        this.insert = prepareInsert(session);
        this.select = prepareSelect(session);
        this.insertPart = prepareInsertPart(session);
        this.selectPart = prepareSelectPart(session);
    }

    @VisibleForTesting
    public CassandraBlobsDAO(Session session) {
        this(session, CassandraConfiguration.DEFAULT_CONFIGURATION, new CassandraBlobId.Factory());
    }

    private PreparedStatement prepareSelect(Session session) {
        return session.prepare(QueryBuilder.select().from(BlobTable.TABLE_NAME).where(QueryBuilder.eq(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID))));
    }

    private PreparedStatement prepareSelectPart(Session session) {
        return session.prepare(QueryBuilder.select().from(BlobTable.BlobParts.TABLE_NAME).where(QueryBuilder.eq(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID))).and(QueryBuilder.eq(BlobTable.BlobParts.CHUNK_NUMBER, QueryBuilder.bindMarker(BlobTable.BlobParts.CHUNK_NUMBER))));
    }

    private PreparedStatement prepareInsert(Session session) {
        return session.prepare(QueryBuilder.insertInto(BlobTable.TABLE_NAME).value(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID)).value(BlobTable.NUMBER_OF_CHUNK, QueryBuilder.bindMarker(BlobTable.NUMBER_OF_CHUNK)));
    }

    private PreparedStatement prepareInsertPart(Session session) {
        return session.prepare(QueryBuilder.insertInto(BlobTable.BlobParts.TABLE_NAME).value(BlobTable.ID, QueryBuilder.bindMarker(BlobTable.ID)).value(BlobTable.BlobParts.CHUNK_NUMBER, QueryBuilder.bindMarker(BlobTable.BlobParts.CHUNK_NUMBER)).value(BlobTable.BlobParts.DATA, QueryBuilder.bindMarker(BlobTable.BlobParts.DATA)));
    }

    public CompletableFuture<BlobId> save(byte[] bArr) {
        Preconditions.checkNotNull(bArr);
        CassandraBlobId m1forPayload = this.blobIdFactory.m1forPayload(bArr);
        return saveBlobParts(bArr, m1forPayload).thenCompose(num -> {
            return saveBlobPartsReferences(m1forPayload, num.intValue());
        }).thenApply((Function<? super U, ? extends U>) r3 -> {
            return m1forPayload;
        });
    }

    private CompletableFuture<Integer> saveBlobParts(byte[] bArr, CassandraBlobId cassandraBlobId) {
        return FluentFutureStream.of(this.dataChunker.chunk(bArr, this.configuration.getBlobPartSize()).map(pair -> {
            return writePart((ByteBuffer) pair.getRight(), cassandraBlobId, ((Integer) pair.getKey()).intValue()).thenApply(r4 -> {
                return Pair.of(pair.getKey(), r4);
            });
        })).completableFuture().thenApply(stream -> {
            return (Integer) getLastOfStream(stream).map(pair2 -> {
                return Integer.valueOf(((Integer) pair2.getLeft()).intValue() + 1);
            }).orElse(0);
        });
    }

    private static <T> Optional<T> getLastOfStream(Stream<T> stream) {
        return stream.reduce((obj, obj2) -> {
            return obj2;
        });
    }

    private CompletableFuture<Void> writePart(ByteBuffer byteBuffer, CassandraBlobId cassandraBlobId, int i) {
        return this.cassandraAsyncExecutor.executeVoid(this.insertPart.bind().setString(BlobTable.ID, cassandraBlobId.asString()).setInt(BlobTable.BlobParts.CHUNK_NUMBER, i).setBytes(BlobTable.BlobParts.DATA, byteBuffer));
    }

    private CompletableFuture<Void> saveBlobPartsReferences(CassandraBlobId cassandraBlobId, int i) {
        return this.cassandraAsyncExecutor.executeVoid(this.insert.bind().setString(BlobTable.ID, cassandraBlobId.asString()).setInt(BlobTable.NUMBER_OF_CHUNK, i));
    }

    public CompletableFuture<byte[]> read(BlobId blobId) {
        return this.cassandraAsyncExecutor.executeSingleRow(this.select.bind().setString(BlobTable.ID, blobId.asString())).thenCompose(optional -> {
            return toDataParts(optional, blobId);
        }).thenApply(this::concatenateDataParts);
    }

    private CompletableFuture<Stream<BlobPart>> toDataParts(Optional<Row> optional, BlobId blobId) {
        return (CompletableFuture) optional.map(row -> {
            return FluentFutureStream.of(IntStream.range(0, row.getInt(BlobTable.NUMBER_OF_CHUNK)).mapToObj(i -> {
                return readPart(blobId, i);
            })).completableFuture();
        }).orElseGet(() -> {
            LOGGER.warn("Could not retrieve blob metadata for {}", blobId);
            return CompletableFuture.completedFuture(Stream.empty());
        });
    }

    /* JADX WARN: Type inference failed for: r1v7, types: [byte[], java.lang.Object[]] */
    private byte[] concatenateDataParts(Stream<BlobPart> stream) {
        ImmutableList immutableList = (ImmutableList) stream.map(blobPart -> {
            return OptionalUtils.executeIfEmpty(blobPart.row, () -> {
                LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, Integer.valueOf(blobPart.position));
            });
        }).flatMap(OptionalUtils::toStream).map(this::rowToData).collect(Guavate.toImmutableList());
        return Bytes.concat((byte[][]) immutableList.toArray((Object[]) new byte[immutableList.size()]));
    }

    private byte[] rowToData(Row row) {
        byte[] bArr = new byte[row.getBytes(BlobTable.BlobParts.DATA).remaining()];
        row.getBytes(BlobTable.BlobParts.DATA).get(bArr);
        return bArr;
    }

    private CompletableFuture<BlobPart> readPart(BlobId blobId, int i) {
        return this.cassandraAsyncExecutor.executeSingleRow(this.selectPart.bind().setString(BlobTable.ID, blobId.asString()).setInt(BlobTable.BlobParts.CHUNK_NUMBER, i)).thenApply(optional -> {
            return new BlobPart(blobId, i, optional);
        });
    }
}
