package com.agorapulse.micronaut.amazon.awssdk.dynamodb;

import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondaryPartitionKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.SecondarySortKey;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.Builders;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedQuery;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedScan;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedUpdate;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.UpdateBuilder;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.events.DynamoDbEvent;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.exception.FailedBatchRequestException;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.beans.BeanIntrospection;
import io.micronaut.core.beans.BeanProperty;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
import software.amazon.awssdk.enhanced.dynamodb.Key;
import software.amazon.awssdk.enhanced.dynamodb.TableMetadata;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey;
import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult;
import software.amazon.awssdk.enhanced.dynamodb.model.EnhancedGlobalSecondaryIndex;
import software.amazon.awssdk.enhanced.dynamodb.model.EnhancedLocalSecondaryIndex;
import software.amazon.awssdk.enhanced.dynamodb.model.ReadBatch;
import software.amazon.awssdk.enhanced.dynamodb.model.WriteBatch;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.Projection;
import software.amazon.awssdk.services.dynamodb.model.ProjectionType;

/* loaded from: input_file:com/agorapulse/micronaut/amazon/awssdk/dynamodb/DefaultAsyncDynamoDbService.class */
public class DefaultAsyncDynamoDbService<T> implements AsyncDynamoDbService<T> {
    private final Class<T> itemType;
    private final DynamoDbEnhancedAsyncClient enhancedClient;
    private final DynamoDbAsyncClient client;
    private final AttributeConversionHelper attributeConversionHelper;
    private final ApplicationEventPublisher<DynamoDbEvent<T>> publisher;
    private final DynamoDbAsyncTable<T> table;

    public DefaultAsyncDynamoDbService(Class<T> cls, DynamoDbEnhancedAsyncClient dynamoDbEnhancedAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, AttributeConversionHelper attributeConversionHelper, ApplicationEventPublisher<DynamoDbEvent<T>> applicationEventPublisher, DynamoDbAsyncTable<T> dynamoDbAsyncTable) {
        this.itemType = cls;
        this.enhancedClient = dynamoDbEnhancedAsyncClient;
        this.client = dynamoDbAsyncClient;
        this.attributeConversionHelper = attributeConversionHelper;
        this.publisher = applicationEventPublisher;
        this.table = dynamoDbAsyncTable;
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Class<T> getItemType() {
        return this.itemType;
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public DynamoDbAsyncTable<T> getTable() {
        return this.table;
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> query(DetachedQuery<T> detachedQuery) {
        return Flux.from(detachedQuery.mo37query(this.table, this.attributeConversionHelper)).map(this::postLoad);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> scan(DetachedScan<T> detachedScan) {
        return Flux.from(detachedScan.mo40scan(this.table, this.attributeConversionHelper)).map(this::postLoad);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> findAll(Object obj, Object obj2) {
        return Flux.from(simplePartitionAndSort(obj, obj2).mo37query(this.table, this.attributeConversionHelper)).map(this::postLoad);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public <R> Publisher<R> update(DetachedUpdate<T, R> detachedUpdate) {
        return detachedUpdate.update(this.table, this.client, this.attributeConversionHelper, this.publisher);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public <R> Publisher<R> updateAll(Publisher<T> publisher, UpdateBuilder<T, R> updateBuilder) {
        BeanIntrospection beanIntrospection = EntityIntrospection.getBeanIntrospection(this.table);
        TableMetadata tableMetadata = this.table.tableSchema().tableMetadata();
        return Flux.from(publisher).map(this::postLoad).flatMap(obj -> {
            beanIntrospection.getProperty(tableMetadata.primaryPartitionKey()).ifPresent(beanProperty -> {
                updateBuilder.partitionKey(beanProperty.get(obj));
            });
            Optional primarySortKey = tableMetadata.primarySortKey();
            Objects.requireNonNull(beanIntrospection);
            primarySortKey.flatMap(beanIntrospection::getProperty).ifPresent(beanProperty2 -> {
                updateBuilder.sortKey(beanProperty2.get(obj));
            });
            return updateBuilder.update(this.table, this.client, this.attributeConversionHelper, this.publisher);
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> save(T t) {
        this.publisher.publishEvent(DynamoDbEvent.prePersist(t));
        return Mono.fromFuture(this.table.updateItem(t)).flatMap(obj -> {
            return Mono.fromCallable(() -> {
                this.publisher.publishEvent(DynamoDbEvent.postPersist(obj));
                return obj;
            });
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> saveAll(Publisher<T> publisher, int i) {
        return Flux.from(publisher).buffer(withinBatchSizeBounds(i)).flatMap(list -> {
            return Mono.fromFuture(this.enhancedClient.batchWriteItem(builder -> {
                builder.writeBatches(list.stream().map(obj -> {
                    this.publisher.publishEvent(DynamoDbEvent.prePersist(obj));
                    return WriteBatch.builder(this.table.tableSchema().itemType().rawClass()).mappedTableResource(this.table).addPutItem(obj).build();
                }).toList());
            })).zipWith(Mono.just(list));
        }).flatMap(tuple2 -> {
            List unprocessedPutItemsForTable = ((BatchWriteResult) tuple2.getT1()).unprocessedPutItemsForTable(this.table);
            return unprocessedPutItemsForTable.isEmpty() ? Flux.fromIterable((Iterable) tuple2.getT2()).doOnNext(obj -> {
                this.publisher.publishEvent(DynamoDbEvent.postPersist(obj));
            }) : Flux.error(new FailedBatchRequestException("Failed to save items", unprocessedPutItemsForTable));
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> delete(Object obj, @Nullable Object obj2) {
        return (Publisher) doWithKey(obj, obj2, this::delete);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> delete(T t) {
        this.publisher.publishEvent(DynamoDbEvent.preRemove(t));
        return Mono.fromFuture(this.table.deleteItem(this.table.keyFrom(t))).map(obj -> {
            this.publisher.publishEvent(DynamoDbEvent.postRemove(obj));
            return obj;
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> delete(Key key) {
        this.publisher.publishEvent(DynamoDbEvent.preRemove(this.table.tableSchema().mapToItem(key.primaryKeyMap(this.table.tableSchema()))));
        return Mono.fromFuture(this.table.deleteItem(key)).map(obj -> {
            this.publisher.publishEvent(DynamoDbEvent.postRemove(obj));
            return obj;
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> deleteAll(Publisher<T> publisher, int i) {
        return Flux.from(publisher).buffer(withinBatchSizeBounds(i)).flatMap(list -> {
            return Mono.fromFuture(this.enhancedClient.batchWriteItem(builder -> {
                builder.writeBatches(list.stream().map(obj -> {
                    this.publisher.publishEvent(DynamoDbEvent.preRemove(obj));
                    return WriteBatch.builder(this.table.tableSchema().itemType().rawClass()).mappedTableResource(this.table).addDeleteItem(obj).build();
                }).toList());
            })).zipWith(Mono.just(list));
        }).flatMap(tuple2 -> {
            List unprocessedDeleteItemsForTable = ((BatchWriteResult) tuple2.getT1()).unprocessedDeleteItemsForTable(this.table);
            if (!unprocessedDeleteItemsForTable.isEmpty()) {
                return Flux.error(new FailedBatchRequestException("Failed to delete items", unprocessedDeleteItemsForTable));
            }
            ((List) tuple2.getT2()).forEach(obj -> {
                this.publisher.publishEvent(DynamoDbEvent.postRemove(obj));
            });
            return Flux.fromIterable((Iterable) tuple2.getT2());
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> get(Object obj, Object obj2) {
        return (Publisher) doWithKey(obj, obj2, this::get);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> getAll(Object obj, Publisher<?> publisher, int i) {
        return (Publisher<T>) doWithKeys(obj, publisher, (attributeValue, publisher2) -> {
            return getAll(attributeValue, (Publisher<AttributeValue>) publisher2, i);
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> getAll(Publisher<?> publisher, int i) {
        return (Publisher<T>) doWithKeys(publisher, publisher2 -> {
            return getAllByAttributeValue(publisher2, i);
        });
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<T> get(Key key) {
        return Mono.fromFuture(this.table.getItem(key)).map(this::postLoad);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<Long> count(DetachedQuery<T> detachedQuery) {
        return detachedQuery.mo36count(this.table, this.attributeConversionHelper);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<Long> count(DetachedScan<T> detachedScan) {
        return detachedScan.mo39count(this.table, this.attributeConversionHelper);
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<Long> count(Object obj, @Nullable Object obj2) {
        return count(simplePartitionAndSort(obj, obj2));
    }

    @Override // com.agorapulse.micronaut.amazon.awssdk.dynamodb.AsyncDynamoDbService
    public Publisher<Boolean> createTable() {
        Map<String, ProjectionType> projectionTypes = getProjectionTypes();
        TableMetadata tableMetadata = this.table.tableSchema().tableMetadata();
        return Mono.fromFuture(this.table.createTable(builder -> {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            tableMetadata.indices().forEach(indexMetadata -> {
                if (TableMetadata.primaryIndexName().equals(indexMetadata.name())) {
                    return;
                }
                ProjectionType projectionType = (ProjectionType) projectionTypes.getOrDefault(indexMetadata.name(), ProjectionType.KEYS_ONLY);
                if (tableMetadata.primaryPartitionKey().equals(tableMetadata.indexPartitionKey(indexMetadata.name()))) {
                    arrayList.add(EnhancedLocalSecondaryIndex.create(indexMetadata.name(), (Projection) Projection.builder().projectionType(projectionType).build()));
                } else {
                    arrayList2.add(EnhancedGlobalSecondaryIndex.builder().indexName(indexMetadata.name()).projection((Projection) Projection.builder().projectionType(projectionType).build()).build());
                }
            });
            if (!arrayList.isEmpty()) {
                builder.localSecondaryIndices(arrayList);
            }
            if (arrayList2.isEmpty()) {
                return;
            }
            builder.globalSecondaryIndices(arrayList2);
        })).then(Mono.just(true)).onErrorReturn(false);
    }

    private static int withinBatchSizeBounds(int i) {
        return Math.max(2, Math.min(i, 25));
    }

    private DetachedQuery<T> simplePartitionAndSort(Object obj, Object obj2) {
        return (DetachedQuery) doWithKey(obj, obj2, key -> {
            return key.sortKeyValue().isPresent() ? Builders.query(queryBuilder -> {
                queryBuilder.partitionKey(key.partitionKeyValue()).sortKey(keyConditionCollector -> {
                    keyConditionCollector.eq(key.sortKeyValue().get());
                });
            }) : Builders.query(queryBuilder2 -> {
                queryBuilder2.partitionKey(key.partitionKeyValue());
            });
        });
    }

    private Publisher<T> getAllByAttributeValue(Publisher<AttributeValue> publisher, int i) {
        TableSchema tableSchema = this.table.tableSchema();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger = new AtomicInteger();
        return Flux.from(publisher).buffer(withinBatchSizeBounds(i)).map(list -> {
            return this.enhancedClient.batchGetItem(builder -> {
                builder.readBatches(list.stream().map(attributeValue -> {
                    concurrentHashMap.put(attributeValue, Integer.valueOf(atomicInteger.getAndIncrement()));
                    return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(this.table).addGetItem(Key.builder().partitionValue(attributeValue).build()).build();
                }).toList());
            });
        }).flatMap(batchGetResultPagePublisher -> {
            return Flux.from(batchGetResultPagePublisher.resultsForTable(this.table)).map(this::postLoad);
        }).sort(Comparator.comparingInt(obj -> {
            return ((Integer) concurrentHashMap.getOrDefault(tableSchema.attributeValue(obj, tableSchema.tableMetadata().primaryPartitionKey()), 0)).intValue();
        }));
    }

    private Publisher<T> getAll(AttributeValue attributeValue, Publisher<AttributeValue> publisher, int i) {
        TableSchema tableSchema = this.table.tableSchema();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicInteger atomicInteger = new AtomicInteger();
        return Flux.from(publisher).buffer(withinBatchSizeBounds(i)).map(list -> {
            return this.enhancedClient.batchGetItem(builder -> {
                builder.readBatches(list.stream().map(attributeValue2 -> {
                    concurrentHashMap.put(attributeValue2, Integer.valueOf(atomicInteger.getAndIncrement()));
                    return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(this.table).addGetItem(Key.builder().partitionValue(attributeValue).sortValue(attributeValue2).build()).build();
                }).toList());
            });
        }).flatMap(batchGetResultPagePublisher -> {
            return Flux.from(batchGetResultPagePublisher.resultsForTable(this.table)).map(this::postLoad);
        }).sort(Comparator.comparingInt(obj -> {
            return ((Integer) concurrentHashMap.getOrDefault(tableSchema.attributeValue(obj, (String) tableSchema.tableMetadata().primarySortKey().get()), 0)).intValue();
        }));
    }

    private Map<String, ProjectionType> getProjectionTypes() {
        HashMap hashMap = new HashMap();
        EntityIntrospection.getBeanIntrospection(this.table).getBeanProperties().forEach(beanProperty -> {
            AnnotationValue annotation = beanProperty.getAnnotation(com.agorapulse.micronaut.amazon.awssdk.dynamodb.annotation.Projection.class);
            if (annotation == null) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(collectIndicesFromAnnotation(beanProperty, DynamoDbSecondarySortKey.class));
            arrayList.addAll(collectIndicesFromAnnotation(beanProperty, SecondarySortKey.class));
            arrayList.addAll(collectIndicesFromAnnotation(beanProperty, DynamoDbSecondaryPartitionKey.class));
            arrayList.addAll(collectIndicesFromAnnotation(beanProperty, SecondaryPartitionKey.class));
            if (arrayList.isEmpty()) {
                return;
            }
            ProjectionType projectionType = (ProjectionType) annotation.enumValue(ProjectionType.class).orElse(ProjectionType.KEYS_ONLY);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                hashMap.put((String) it.next(), projectionType);
            }
        });
        return hashMap;
    }

    private List<String> collectIndicesFromAnnotation(BeanProperty<T, Object> beanProperty, Class<? extends Annotation> cls) {
        return (List) beanProperty.findAnnotation(cls).map(annotationValue -> {
            return Arrays.asList(annotationValue.stringValues("indexNames"));
        }).orElse(Collections.emptyList());
    }

    private T postLoad(T t) {
        this.publisher.publishEvent(DynamoDbEvent.postLoad(t));
        return t;
    }

    private <R> R doWithKey(Object obj, Object obj2, Function<Key, R> function) {
        String primaryPartitionKey = this.table.tableSchema().tableMetadata().primaryPartitionKey();
        if (obj == null) {
            throw new IllegalArgumentException("Partition key " + primaryPartitionKey + " cannot be null");
        }
        AttributeValue convert = this.attributeConversionHelper.convert(this.table, primaryPartitionKey, obj);
        if (obj2 == null) {
            return function.apply(Key.builder().partitionValue(convert).build());
        }
        return function.apply(Key.builder().partitionValue(convert).sortValue(this.attributeConversionHelper.convert(this.table, (String) this.table.tableSchema().tableMetadata().primarySortKey().get(), obj2)).build());
    }

    private <R> Publisher<R> doWithKeys(Object obj, Publisher<?> publisher, BiFunction<AttributeValue, Publisher<AttributeValue>, Publisher<R>> biFunction) {
        String primaryPartitionKey = this.table.tableSchema().tableMetadata().primaryPartitionKey();
        if (obj == null) {
            throw new IllegalArgumentException("Partition key " + primaryPartitionKey + " cannot be null");
        }
        AttributeValue convert = this.attributeConversionHelper.convert(this.table, primaryPartitionKey, obj);
        Optional primarySortKey = this.table.tableSchema().tableMetadata().primarySortKey();
        return biFunction.apply(convert, Flux.from(publisher).map(obj2 -> {
            return this.attributeConversionHelper.convert(this.table, (String) primarySortKey.get(), obj2);
        }));
    }

    private <R> Publisher<R> doWithKeys(Publisher<?> publisher, Function<Publisher<AttributeValue>, Publisher<R>> function) {
        String primaryPartitionKey = this.table.tableSchema().tableMetadata().primaryPartitionKey();
        return function.apply(Flux.from(publisher).map(obj -> {
            return this.attributeConversionHelper.convert(this.table, primaryPartitionKey, obj);
        }));
    }
}
