package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.client.reactor.AerospikeReactorClient;
import com.aerospike.helper.query.Qualifier;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.convert.AerospikeWriteData;
import org.springframework.data.aerospike.convert.MappingAerospikeConverter;
import org.springframework.data.aerospike.mapping.AerospikeMappingContext;
import org.springframework.data.aerospike.mapping.AerospikePersistentEntity;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.domain.Sort;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/data/aerospike/core/ReactiveAerospikeTemplate.class */
public class ReactiveAerospikeTemplate extends BaseAerospikeTemplate implements ReactiveAerospikeOperations {
    private static final Logger log = LoggerFactory.getLogger(ReactiveAerospikeTemplate.class);
    private final AerospikeReactorClient reactorClient;

    public ReactiveAerospikeTemplate(AerospikeClient aerospikeClient, String str, MappingAerospikeConverter mappingAerospikeConverter, AerospikeMappingContext aerospikeMappingContext, AerospikeExceptionTranslator aerospikeExceptionTranslator, AerospikeReactorClient aerospikeReactorClient) {
        super(aerospikeClient, str, mappingAerospikeConverter, aerospikeMappingContext, aerospikeExceptionTranslator);
        Assert.notNull(aerospikeReactorClient, "Aerospike reactor client must not be null!");
        this.reactorClient = aerospikeReactorClient;
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> save(T t) {
        Assert.notNull(t, "Object to save must not be null!");
        AerospikeWriteData writeData = writeData(t);
        return writeData.hasVersion() ? doPersistWithVersionAndHandleCasError(t, writeData, expectGenerationCasAwareSavePolicy(writeData)) : doPersistAndHandleError(t, writeData, ignoreGenerationSavePolicy(writeData, RecordExistsAction.REPLACE));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> insertAll(Collection<? extends T> collection) {
        return Flux.fromIterable(collection).flatMap(this::insert);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> insert(T t) {
        Assert.notNull(t, "Document must not be null!");
        AerospikeWriteData writeData = writeData(t);
        WritePolicy ignoreGenerationSavePolicy = ignoreGenerationSavePolicy(writeData, RecordExistsAction.CREATE_ONLY);
        return writeData.hasVersion() ? doPersistWithVersionAndHandleError(t, writeData, ignoreGenerationSavePolicy) : doPersistAndHandleError(t, writeData, ignoreGenerationSavePolicy);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> update(T t) {
        Assert.notNull(t, "Document must not be null!");
        AerospikeWriteData writeData = writeData(t);
        return writeData.hasVersion() ? doPersistWithVersionAndHandleCasError(t, writeData, expectGenerationSavePolicy(writeData, RecordExistsAction.REPLACE_ONLY)) : doPersistAndHandleError(t, writeData, ignoreGenerationSavePolicy(writeData, RecordExistsAction.REPLACE_ONLY));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findAll(Class<T> cls) {
        return Flux.fromStream(findAllUsingQuery(cls, null, (Qualifier[]) null));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, Map<String, Long> map) {
        Assert.notNull(t, "Object to add to must not be null!");
        Assert.notNull(map, "Values must not be null!");
        AerospikeWriteData writeData = writeData(t);
        Operation[] operationArr = new Operation[map.size() + 1];
        int i = 0;
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            operationArr[i] = new Operation(Operation.Type.ADD, entry.getKey(), Value.get(entry.getValue()));
            i++;
        }
        operationArr[i] = Operation.get();
        WritePolicy writePolicy = new WritePolicy(this.client.writePolicyDefault);
        writePolicy.expiration = writeData.getExpiration();
        return executeOperationsOnValue(t, writeData, operationArr, writePolicy);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> add(T t, String str, long j) {
        Assert.notNull(t, "Object to add to must not be null!");
        Assert.notNull(str, "Bin name must not be null!");
        AerospikeWriteData writeData = writeData(t);
        WritePolicy writePolicy = new WritePolicy(this.client.writePolicyDefault);
        writePolicy.expiration = writeData.getExpiration();
        return executeOperationsOnValue(t, writeData, new Operation[]{Operation.add(new Bin(str, j)), Operation.get(str)}, writePolicy);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, Map<String, String> map) {
        Assert.notNull(t, "Object to append to must not be null!");
        Assert.notNull(map, "Values must not be null!");
        return executeOperationsOnValue(t, writeData(t), OperationUtils.operations(map, Operation.Type.APPEND, Operation.get()), null);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> append(T t, String str, String str2) {
        Assert.notNull(t, "Object to append to must not be null!");
        return executeOperationsOnValue(t, writeData(t), new Operation[]{Operation.append(new Bin(str, str2)), Operation.get(str)}, null);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, Map<String, String> map) {
        Assert.notNull(t, "Object to prepend to must not be null!");
        Assert.notNull(map, "Values must not be null!");
        return executeOperationsOnValue(t, writeData(t), OperationUtils.operations(map, Operation.Type.PREPEND, Operation.get()), null);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> prepend(T t, String str, String str2) {
        Assert.notNull(t, "Object to prepend to must not be null!");
        return executeOperationsOnValue(t, writeData(t), new Operation[]{Operation.prepend(new Bin(str, str2)), Operation.get(str)}, null);
    }

    private <T> Mono<T> executeOperationsOnValue(T t, AerospikeWriteData aerospikeWriteData, Operation[] operationArr, WritePolicy writePolicy) {
        return this.reactorClient.operate(writePolicy, aerospikeWriteData.getKey(), operationArr).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        }).map(keyRecord2 -> {
            return mapToEntity(keyRecord2.key, getEntityClass(t), keyRecord2.record);
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> findById(Object obj, Class<T> cls) {
        AerospikePersistentEntity<?> aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        Key key = getKey(obj, aerospikePersistentEntity);
        if (!aerospikePersistentEntity.isTouchOnRead()) {
            return this.reactorClient.get(key).filter(keyRecord -> {
                return Objects.nonNull(keyRecord.record);
            }).map(keyRecord2 -> {
                return mapToEntity(keyRecord2.key, cls, keyRecord2.record);
            }).onErrorMap(this::translateError);
        }
        Assert.state(!aerospikePersistentEntity.hasExpirationProperty(), "Touch on read is not supported for entity without expiration property");
        return getAndTouch(key, aerospikePersistentEntity.getExpiration()).filter(keyRecord3 -> {
            return Objects.nonNull(keyRecord3.record);
        }).map(keyRecord4 -> {
            return mapToEntity(keyRecord4.key, cls, keyRecord4.record);
        }).onErrorResume(th -> {
            return (th instanceof AerospikeException) && ((AerospikeException) th).getResultCode() == 2;
        }, th2 -> {
            return Mono.empty();
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findByIds(Iterable<?> iterable, Class<T> cls) {
        Assert.notNull(iterable, "List of ids must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        AerospikePersistentEntity aerospikePersistentEntity = (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls);
        Flux map = Flux.fromIterable(iterable).map(obj -> {
            return getKey(obj, aerospikePersistentEntity);
        });
        AerospikeReactorClient aerospikeReactorClient = this.reactorClient;
        aerospikeReactorClient.getClass();
        return map.flatMap(aerospikeReactorClient::get).filter(keyRecord -> {
            return Objects.nonNull(keyRecord.record);
        }).map(keyRecord2 -> {
            return mapToEntity(keyRecord2.key, cls, keyRecord2.record);
        });
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> find(Query query, Class<T> cls) {
        Assert.notNull(query, "Query must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        return Flux.fromStream(findAllUsingQuery(cls, query));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Flux<T> findInRange(long j, long j2, Sort sort, Class<T> cls) {
        Assert.notNull(cls, "Type for count must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        return Flux.fromStream(findAllUsingQuery(cls, null, (Qualifier[]) null).skip(j).limit(j2));
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Long> count(Query query, Class<T> cls) {
        Assert.notNull(query, "Query must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        return Flux.fromStream(findAllRecordsUsingQuery(cls, query)).count();
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<T> execute(Supplier<T> supplier) {
        Assert.notNull(supplier, "Supplier must not be null!");
        return Mono.fromSupplier(supplier).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> exists(Object obj, Class<T> cls) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        return this.reactorClient.exists(getKey(obj, (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls))).map((v0) -> {
            return Objects.nonNull(v0);
        }).defaultIfEmpty(false).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> delete(Object obj, Class<T> cls) {
        Assert.notNull(obj, "Id must not be null!");
        Assert.notNull(cls, "Type must not be null!");
        return this.reactorClient.delete(ignoreGenerationDeletePolicy(), getKey(obj, (AerospikePersistentEntity) this.mappingContext.getRequiredPersistentEntity(cls))).map(key -> {
            return true;
        }).onErrorMap(this::translateError);
    }

    @Override // org.springframework.data.aerospike.core.ReactiveAerospikeOperations
    public <T> Mono<Boolean> delete(T t) {
        Assert.notNull(t, "Object to delete must not be null!");
        return this.reactorClient.delete(ignoreGenerationDeletePolicy(), writeData(t).getKey()).map(key -> {
            return true;
        }).onErrorMap(this::translateError);
    }

    private <T> Mono<T> doPersistAndHandleError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy) {
        return this.reactorClient.put(writePolicy, aerospikeWriteData.getKey(), aerospikeWriteData.getBinsAsArray()).map(key -> {
            return t;
        }).onErrorMap(this::translateError);
    }

    private <T> Mono<T> doPersistWithVersionAndHandleCasError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy) {
        return putAndGetHeader(aerospikeWriteData, writePolicy).map(record -> {
            return updateVersion(t, record);
        }).onErrorMap(AerospikeException.class, this::translateCasError);
    }

    private <T> Mono<T> doPersistWithVersionAndHandleError(T t, AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy) {
        return putAndGetHeader(aerospikeWriteData, writePolicy).map(record -> {
            return updateVersion(t, record);
        }).onErrorMap(AerospikeException.class, this::translateError);
    }

    private <T> Mono<Record> putAndGetHeader(AerospikeWriteData aerospikeWriteData, WritePolicy writePolicy) {
        return this.reactorClient.operate(writePolicy, aerospikeWriteData.getKey(), OperationUtils.operations(aerospikeWriteData.getBinsAsArray(), (Function<Bin, Operation>) Operation::put, Operation.getHeader())).map(keyRecord -> {
            return keyRecord.record;
        });
    }

    private Mono<KeyRecord> getAndTouch(Key key, int i) {
        WritePolicy writePolicy = new WritePolicy(this.client.writePolicyDefault);
        writePolicy.expiration = i;
        return this.reactorClient.operate(writePolicy, key, new Operation[]{Operation.touch(), Operation.get()});
    }

    private Throwable translateError(Throwable th) {
        return th instanceof AerospikeException ? translateError((AerospikeException) th) : th;
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate
    public /* bridge */ /* synthetic */ String getNamespace() {
        return super.getNamespace();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ MappingContext getMappingContext() {
        return super.getMappingContext();
    }

    @Override // org.springframework.data.aerospike.core.BaseAerospikeTemplate, org.springframework.data.aerospike.core.AerospikeOperations
    public /* bridge */ /* synthetic */ String getSetName(Class cls) {
        return super.getSetName(cls);
    }
}
