package io.fluxcapacitor.javaclient.persisting.eventsourcing;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.modeling.AggregateRoot;
import io.fluxcapacitor.javaclient.modeling.AssertLegal;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.caching.NoOpCache;
import io.fluxcapacitor.javaclient.persisting.search.DocumentStore;
import io.fluxcapacitor.javaclient.tracking.handling.validation.ValidationUtils;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/DefaultAggregateRepository.class */
public class DefaultAggregateRepository implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(DefaultAggregateRepository.class);
    private static final Function<String, String> keyFunction = str -> {
        return DefaultAggregateRepository.class.getSimpleName() + ":" + str;
    };
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final DocumentStore documentStore;
    private final EventStoreSerializer serializer;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Map<Class<?>, AggregateFactoryFunction> aggregateFactory;
    private final ThreadLocal<Collection<EventSourcedAggregate<?>>> loadedModels;

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/DefaultAggregateRepository$AggregateFactoryFunction.class */
    public interface AggregateFactoryFunction {
        EventSourcedAggregate<?> create(String str, boolean z, boolean z2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/DefaultAggregateRepository$EventSourcedAggregate.class */
    public class EventSourcedAggregate<T> implements AggregateRoot<T> {
        private final Class<T> aggregateType;
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStoreSerializer serializer;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final DocumentStore documentStore;
        private final boolean eventSourced;
        private final boolean searchable;
        private final String collection;
        private final Function<AggregateRoot<?>, Instant> timestampFunction;
        private final List<DeserializingMessage> unpublishedEvents = new ArrayList();
        private final boolean readOnly;
        private final String id;
        private EventSourcedModel<T> model;
        private boolean updated;

        protected void initialize() {
            this.model = (EventSourcedModel) Optional.ofNullable((EventSourcedModel) this.cache.getIfPresent(DefaultAggregateRepository.keyFunction.apply(this.id))).filter(eventSourcedModel -> {
                return this.aggregateType.isAssignableFrom(eventSourcedModel.get().getClass());
            }).orElseGet(() -> {
                EventSourcedModel<T> orElse = this.snapshotRepository.getSnapshot(this.id).filter(eventSourcedModel2 -> {
                    return this.aggregateType.isAssignableFrom(eventSourcedModel2.get().getClass());
                }).orElse(EventSourcedModel.builder().id(this.id).type(this.aggregateType).build());
                if (!this.eventSourced) {
                    return orElse.toBuilder().sequenceNumber(orElse.sequenceNumber()).build();
                }
                AggregateEventStream<DeserializingMessage> events = this.eventStore.getEvents(this.id, orElse.sequenceNumber());
                Iterator<DeserializingMessage> it = events.iterator();
                while (it.hasNext()) {
                    DeserializingMessage next = it.next();
                    orElse = orElse.toBuilder().sequenceNumber(orElse.sequenceNumber() + 1).type(this.aggregateType).id(this.id).lastEventId(next.getSerializedObject().getMessageId()).lastEventIndex(next.getSerializedObject().getIndex()).timestamp(Instant.ofEpochMilli(next.getSerializedObject().getTimestamp().longValue())).model(this.eventSourcingHandler.invoke(orElse, next)).previous(orElse).build();
                }
                return orElse.toBuilder().sequenceNumber(events.getLastSequenceNumber().orElse(Long.valueOf(orElse.sequenceNumber())).longValue()).build();
            });
        }

        public Class<?> getAggregateType() {
            return (Class) Optional.ofNullable(this.model).map((v0) -> {
                return v0.get();
            }).map(obj -> {
                return obj.getClass();
            }).orElse(this.aggregateType);
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public AggregateRoot<T> apply(Message message) {
            if (this.readOnly) {
                throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The model is readonly.", message));
            }
            Message withMetadata = message.withMetadata(message.getMetadata().with(new Object[]{AggregateRoot.AGGREGATE_ID_METADATA_KEY, this.id, AggregateRoot.AGGREGATE_TYPE_METADATA_KEY, getAggregateType().getName()}));
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject(this.serializer.serialize(withMetadata), cls -> {
                return this.serializer.convert(withMetadata.getPayload(), cls);
            }), MessageType.EVENT);
            this.unpublishedEvents.add(deserializingMessage);
            updateModel(this.model.toBuilder().sequenceNumber(this.model.sequenceNumber() + 1).model(this.eventSourcingHandler.invoke(this.model, deserializingMessage)).previous(this.model).lastEventId(withMetadata.getMessageId()).timestamp(withMetadata.getTimestamp()).lastEventIndex(deserializingMessage.getSerializedObject().getIndex()).build());
            return this;
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public AggregateRoot<T> update(UnaryOperator<T> unaryOperator) {
            if (this.eventSourced) {
                DefaultAggregateRepository.log.warn("An event sourced aggregate is updated without applying an event. This is usually a mistake. On aggregate: {}", this);
            }
            updateModel(this.model.update((UnaryOperator) unaryOperator));
            return this;
        }

        protected void updateModel(EventSourcedModel<T> eventSourcedModel) {
            this.model = eventSourcedModel;
            this.updated = true;
            if (DefaultAggregateRepository.this.loadedModels.get() != null) {
                if (DefaultAggregateRepository.this.loadedModels.get().stream().noneMatch(eventSourcedAggregate -> {
                    return eventSourcedAggregate == this;
                })) {
                    DefaultAggregateRepository.this.loadedModels.get().add(this);
                    return;
                }
                return;
            }
            DefaultAggregateRepository.this.loadedModels.set(Collections.asLifoQueue(new ArrayDeque()));
            DefaultAggregateRepository.this.loadedModels.get().add(this);
            Runnable runnable = () -> {
                Collection<EventSourcedAggregate<?>> collection = DefaultAggregateRepository.this.loadedModels.get();
                DefaultAggregateRepository.this.loadedModels.remove();
                collection.stream().map((v0) -> {
                    return v0.commit();
                }).reduce((v0, v1) -> {
                    return v0.join(v1);
                }).ifPresent(awaitable -> {
                    try {
                        awaitable.await();
                    } catch (Exception e) {
                        List list = (List) collection.stream().map(eventSourcedAggregate2 -> {
                            return eventSourcedAggregate2.id;
                        }).collect(Collectors.toList());
                        DefaultAggregateRepository.log.error("Failed to commit events for aggregates {}. Clearing aggregates from the cache.", list, e);
                        list.forEach(str -> {
                            this.cache.invalidate(DefaultAggregateRepository.keyFunction.apply(str));
                        });
                    }
                });
            };
            if (((Aggregate) this.aggregateType.getAnnotation(Aggregate.class)).commitInBatch()) {
                DeserializingMessage.whenBatchCompletes(runnable);
            } else {
                DeserializingMessage.whenMessageCompletes(runnable);
            }
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public <E extends Exception> AggregateRoot<T> assertLegal(Object... objArr) throws Exception {
            switch (objArr.length) {
                case AssertLegal.DEFAULT_PRIORITY /* 0 */:
                    return this;
                case 1:
                    ValidationUtils.assertLegal(objArr[0], this.model);
                    return this;
                default:
                    EventSourcedModel<T> eventSourcedModel = this.model;
                    Iterator<T> it = Arrays.stream(objArr).iterator();
                    while (it.hasNext()) {
                        T next = it.next();
                        ValidationUtils.assertLegal(next, eventSourcedModel);
                        if (it.hasNext()) {
                            eventSourcedModel = forceApply(this.model, next instanceof Message ? (Message) next : new Message(next));
                        }
                    }
                    return this;
            }
        }

        protected EventSourcedModel<T> forceApply(EventSourcedModel<T> eventSourcedModel, Message message) {
            Message withMetadata = message.withMetadata(message.getMetadata().with(new Object[]{AggregateRoot.AGGREGATE_ID_METADATA_KEY, this.id, AggregateRoot.AGGREGATE_TYPE_METADATA_KEY, getAggregateType().getName()}));
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject(this.serializer.serialize(message), cls -> {
                return this.serializer.convert(withMetadata.getPayload(), cls);
            }), MessageType.EVENT);
            return eventSourcedModel.toBuilder().sequenceNumber(eventSourcedModel.sequenceNumber() + 1).model(this.eventSourcingHandler.invoke(eventSourcedModel, deserializingMessage)).previous(eventSourcedModel).lastEventId(withMetadata.getMessageId()).timestamp(withMetadata.getTimestamp()).lastEventIndex(deserializingMessage.getSerializedObject().getIndex()).build();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public T get() {
            return this.model.get();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public AggregateRoot<T> previous() {
            return this.model.previous();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public String lastEventId() {
            return this.model.lastEventId();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public Long lastEventIndex() {
            return this.model.lastEventIndex();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public Instant timestamp() {
            return this.model.timestamp();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public String id() {
            return this.id;
        }

        @Override // io.fluxcapacitor.javaclient.modeling.AggregateRoot
        public Class<T> type() {
            return this.aggregateType;
        }

        public String toString() {
            return "EventSourcedAggregate{collection='" + this.collection + "', eventSourced=" + this.eventSourced + ", searchable=" + this.searchable + ", readOnly=" + this.readOnly + ", model=" + this.model + "}";
        }

        protected Awaitable commit() {
            Awaitable ready = Awaitable.ready();
            try {
                if (this.updated) {
                    this.cache.put(DefaultAggregateRepository.keyFunction.apply(this.model.id()), this.model);
                    if (!this.unpublishedEvents.isEmpty()) {
                        ready = this.eventStore.storeEvents(this.model.id(), this.collection, this.model.sequenceNumber(), new ArrayList(this.unpublishedEvents));
                        if (this.snapshotTrigger.shouldCreateSnapshot(this.model, this.unpublishedEvents)) {
                            this.snapshotRepository.storeSnapshot(this.model);
                        }
                    }
                    if (this.searchable) {
                        T t = this.model.get();
                        if (t == null) {
                            this.documentStore.deleteDocument(this.model.id(), this.collection);
                        } else {
                            this.documentStore.index(t, this.model.id(), this.collection, this.timestampFunction.apply(this.model));
                        }
                    }
                }
            } catch (Exception e) {
                DefaultAggregateRepository.log.error("Failed to commit new events of aggregate {}", this.model.id(), e);
                this.cache.invalidate(DefaultAggregateRepository.keyFunction.apply(this.model.id()));
            } finally {
                this.unpublishedEvents.clear();
                this.updated = false;
            }
            return ready;
        }

        @ConstructorProperties({"aggregateType", "eventSourcingHandler", "cache", "serializer", "eventStore", "snapshotRepository", "snapshotTrigger", "documentStore", "eventSourced", "searchable", "collection", "timestampFunction", "readOnly", "id"})
        public EventSourcedAggregate(Class<T> cls, EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStoreSerializer eventStoreSerializer, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, DocumentStore documentStore, boolean z, boolean z2, String str, Function<AggregateRoot<?>, Instant> function, boolean z3, String str2) {
            this.aggregateType = cls;
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.serializer = eventStoreSerializer;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.documentStore = documentStore;
            this.eventSourced = z;
            this.searchable = z2;
            this.collection = str;
            this.timestampFunction = function;
            this.readOnly = z3;
            this.id = str2;
        }
    }

    public DefaultAggregateRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, DocumentStore documentStore, EventStoreSerializer eventStoreSerializer) {
        this(eventStore, snapshotRepository, cache, documentStore, eventStoreSerializer, new DefaultEventSourcingHandlerFactory(DeserializingMessage.defaultParameterResolvers));
    }

    @Override // io.fluxcapacitor.javaclient.modeling.AggregateRepository
    public boolean supports(Class<?> cls) {
        return cls.isAnnotationPresent(Aggregate.class);
    }

    @Override // io.fluxcapacitor.javaclient.modeling.AggregateRepository
    public boolean cachingAllowed(Class<?> cls) {
        Aggregate aggregate = (Aggregate) cls.getAnnotation(Aggregate.class);
        if (aggregate == null) {
            throw new UnsupportedOperationException("Unsupported aggregate type: " + cls);
        }
        return aggregate.cached();
    }

    @Override // io.fluxcapacitor.javaclient.modeling.AggregateRepository
    public <T> AggregateRoot<T> load(String str, Class<T> cls, boolean z, boolean z2) {
        return z2 ? (AggregateRoot) Optional.ofNullable((EventSourcedModel) this.cache.getIfPresent(keyFunction.apply(str))).map(eventSourcedModel -> {
            EventSourcedAggregate createAggregate = createAggregate(cls, str, z, false);
            createAggregate.model = eventSourcedModel;
            return createAggregate;
        }).orElse(null) : (AggregateRoot) ((Collection) Optional.ofNullable(this.loadedModels.get()).orElse(Collections.emptyList())).stream().filter(eventSourcedAggregate -> {
            return eventSourcedAggregate.id.equals(str) && cls.isAssignableFrom(eventSourcedAggregate.getAggregateType());
        }).map(eventSourcedAggregate2 -> {
            return eventSourcedAggregate2;
        }).findAny().orElseGet(() -> {
            return createAggregate(cls, str, z, true);
        });
    }

    protected <T> EventSourcedAggregate<T> createAggregate(Class<T> cls, String str, boolean z, boolean z2) {
        return (EventSourcedAggregate<T>) this.aggregateFactory.computeIfAbsent(cls, cls2 -> {
            EventSourcingHandler forType = this.handlerFactory.forType(cls);
            Cache cache = isCached(cls) ? this.cache : NoOpCache.INSTANCE;
            SnapshotRepository snapshotRepository = snapshotRepository(cls);
            SnapshotTrigger snapshotTrigger = snapshotTrigger(cls);
            boolean eventSourced = eventSourced(cls);
            boolean searchable = searchable(cls);
            String collection = collection(cls);
            Function<AggregateRoot<?>, Instant> timestampFunction = timestampFunction(cls);
            return (str2, z3, z4) -> {
                EventSourcedAggregate eventSourcedAggregate = new EventSourcedAggregate(cls, forType, cache, this.serializer, this.eventStore, snapshotRepository, snapshotTrigger, this.documentStore, eventSourced, searchable, collection, timestampFunction, z3, str2);
                if (z4) {
                    eventSourcedAggregate.initialize();
                }
                return eventSourcedAggregate;
            };
        }).create(str, z, z2);
    }

    protected SnapshotRepository snapshotRepository(Class<?> cls) {
        return snapshotPeriod(cls) > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> cls) {
        int snapshotPeriod = snapshotPeriod(cls);
        return snapshotPeriod > 0 ? new PeriodicSnapshotTrigger(snapshotPeriod) : NoSnapshotTrigger.INSTANCE;
    }

    protected int snapshotPeriod(Class<?> cls) {
        return ((Integer) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map(aggregate -> {
            return Integer.valueOf(aggregate.eventSourced() ? aggregate.snapshotPeriod() : 1);
        }).orElse(Integer.valueOf(((Integer) Aggregate.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()).intValue()))).intValue();
    }

    protected boolean isCached(Class<?> cls) {
        return ((Boolean) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map((v0) -> {
            return v0.cached();
        }).orElse(Boolean.valueOf(((Boolean) Aggregate.class.getMethod("cached", new Class[0]).getDefaultValue()).booleanValue()))).booleanValue();
    }

    protected String collection(Class<?> cls) {
        return (String) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map((v0) -> {
            return v0.collection();
        }).filter(str -> {
            return !str.isEmpty();
        }).orElse(cls.getSimpleName());
    }

    protected boolean eventSourced(Class<?> cls) {
        return ((Boolean) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map((v0) -> {
            return v0.eventSourced();
        }).orElse(Boolean.valueOf(((Boolean) Aggregate.class.getMethod("eventSourced", new Class[0]).getDefaultValue()).booleanValue()))).booleanValue();
    }

    protected boolean searchable(Class<?> cls) {
        return ((Boolean) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map((v0) -> {
            return v0.searchable();
        }).orElse(Boolean.valueOf(((Boolean) Aggregate.class.getMethod("searchable", new Class[0]).getDefaultValue()).booleanValue()))).booleanValue();
    }

    protected Function<AggregateRoot<?>, Instant> timestampFunction(Class<?> cls) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return (Function) Optional.ofNullable((Aggregate) cls.getAnnotation(Aggregate.class)).map((v0) -> {
            return v0.timestampPath();
        }).filter(str -> {
            return !str.isBlank();
        }).map(str2 -> {
            return aggregateRoot -> {
                return (Instant) ReflectionUtils.readProperty(str2, aggregateRoot.get()).map(obj -> {
                    return Instant.from((TemporalAccessor) obj);
                }).orElseGet(() -> {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        log.warn("Aggregate type {} does not declare the timestamp path property '{}'", aggregateRoot.get().getClass().getSimpleName(), str2);
                    }
                    return aggregateRoot.timestamp();
                });
            };
        }).orElse((v0) -> {
            return v0.timestamp();
        });
    }

    @ConstructorProperties({"eventStore", "snapshotRepository", "cache", "documentStore", "serializer", "handlerFactory"})
    public DefaultAggregateRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, DocumentStore documentStore, EventStoreSerializer eventStoreSerializer, EventSourcingHandlerFactory eventSourcingHandlerFactory) {
        this.aggregateFactory = new ConcurrentHashMap();
        this.loadedModels = new ThreadLocal<>();
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
        this.documentStore = documentStore;
        this.serializer = eventStoreSerializer;
        this.handlerFactory = eventSourcingHandlerFactory;
    }
}
