/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.eventsourcing;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.modeling.AggregateRoot;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.caching.NoOpCache;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.Aggregate;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.AggregateEventStream;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.DefaultEventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcedModel;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStore;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStoreSerializer;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoOpSnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.PeriodicSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.search.DocumentStore;
import io.fluxcapacitor.javaclient.tracking.handling.validation.ValidationUtils;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultAggregateRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(DefaultAggregateRepository.class);
    private static final Function<String, String> keyFunction = aggregateId -> DefaultAggregateRepository.class.getSimpleName() + ":" + aggregateId;
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final DocumentStore documentStore;
    private final EventStoreSerializer serializer;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Map<Class<?>, AggregateFactoryFunction> aggregateFactory = new ConcurrentHashMap();
    private final ThreadLocal<Collection<EventSourcedAggregate<?>>> loadedModels = new ThreadLocal();

    public DefaultAggregateRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, DocumentStore documentStore, EventStoreSerializer serializer) {
        this(eventStore, snapshotRepository, cache, documentStore, serializer, new DefaultEventSourcingHandlerFactory(DeserializingMessage.defaultParameterResolvers));
    }

    @Override
    public boolean supports(Class<?> aggregateType) {
        return aggregateType.isAnnotationPresent(Aggregate.class);
    }

    @Override
    public boolean cachingAllowed(Class<?> aggregateType) {
        Aggregate aggregate = aggregateType.getAnnotation(Aggregate.class);
        if (aggregate == null) {
            throw new UnsupportedOperationException("Unsupported aggregate type: " + aggregateType);
        }
        return aggregate.cached();
    }

    @Override
    public <T> AggregateRoot<T> load(String aggregateId, Class<T> aggregateType, boolean readOnly, boolean onlyCached) {
        if (onlyCached) {
            return Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent(keyFunction.apply(aggregateId))).map(m -> {
                EventSourcedAggregate aggregate = this.createAggregate(aggregateType, aggregateId, readOnly, false);
                aggregate.model = m;
                return aggregate;
            }).orElse(null);
        }
        return ((Collection)Optional.ofNullable(this.loadedModels.get()).orElse(Collections.emptyList())).stream().filter(model -> model.id.equals(aggregateId) && aggregateType.isAssignableFrom(model.getAggregateType())).map(m -> m).findAny().orElseGet(() -> this.createAggregate(aggregateType, aggregateId, readOnly, true));
    }

    protected <T> EventSourcedAggregate<T> createAggregate(Class<T> aggregateType, String aggregateId, boolean readOnly, boolean initialize) {
        return this.aggregateFactory.computeIfAbsent(aggregateType, t -> {
            EventSourcingHandler eventSourcingHandler = this.handlerFactory.forType(aggregateType);
            Cache cache = this.isCached(aggregateType) ? this.cache : NoOpCache.INSTANCE;
            SnapshotRepository snapshotRepository = this.snapshotRepository(aggregateType);
            SnapshotTrigger snapshotTrigger = this.snapshotTrigger(aggregateType);
            boolean eventSourced = this.eventSourced(aggregateType);
            boolean searchable = this.searchable(aggregateType);
            String collection = this.collection(aggregateType);
            Function<AggregateRoot<?>, Instant> timestampFunction = this.timestampFunction(aggregateType);
            return (id, ro, init) -> {
                EventSourcedAggregate eventSourcedAggregate = new EventSourcedAggregate(aggregateType, eventSourcingHandler, cache, this.serializer, this.eventStore, snapshotRepository, snapshotTrigger, this.documentStore, eventSourced, searchable, collection, timestampFunction, ro, id);
                if (init) {
                    eventSourcedAggregate.initialize();
                }
                return eventSourcedAggregate;
            };
        }).create(aggregateId, readOnly, initialize);
    }

    protected SnapshotRepository snapshotRepository(Class<?> aggregateType) {
        int frequency = this.snapshotPeriod(aggregateType);
        return frequency > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> aggregateType) {
        int frequency = this.snapshotPeriod(aggregateType);
        return frequency > 0 ? new PeriodicSnapshotTrigger(frequency) : NoSnapshotTrigger.INSTANCE;
    }

    protected int snapshotPeriod(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(a -> a.eventSourced() || a.searchable() ? a.snapshotPeriod() : 1).orElse((int)((Integer)Aggregate.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
    }

    protected boolean isCached(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::cached).orElse((boolean)((Boolean)Aggregate.class.getMethod("cached", new Class[0]).getDefaultValue()));
    }

    protected String collection(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::collection).filter(s -> !s.isEmpty()).orElse(aggregateType.getSimpleName());
    }

    protected boolean eventSourced(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::eventSourced).orElse((boolean)((Boolean)Aggregate.class.getMethod("eventSourced", new Class[0]).getDefaultValue()));
    }

    protected boolean searchable(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::searchable).orElse((boolean)((Boolean)Aggregate.class.getMethod("searchable", new Class[0]).getDefaultValue()));
    }

    protected Function<AggregateRoot<?>, Instant> timestampFunction(Class<?> aggregateType) {
        AtomicBoolean warnedAboutMissingProperty = new AtomicBoolean();
        return Optional.ofNullable(aggregateType.getAnnotation(Aggregate.class)).map(Aggregate::timestampPath).filter(s -> !s.isBlank()).map(s -> aggregateRoot -> ReflectionUtils.readProperty((String)s, aggregateRoot.get()).map(t -> Instant.from((TemporalAccessor)t)).orElseGet(() -> {
            if (warnedAboutMissingProperty.compareAndSet(false, true)) {
                log.warn("Aggregate type {} does not declare the timestamp path property '{}'", (Object)aggregateRoot.get().getClass().getSimpleName(), s);
            }
            return aggregateRoot.timestamp();
        })).orElse(AggregateRoot::timestamp);
    }

    @ConstructorProperties(value={"eventStore", "snapshotRepository", "cache", "documentStore", "serializer", "handlerFactory"})
    public DefaultAggregateRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, DocumentStore documentStore, EventStoreSerializer serializer, EventSourcingHandlerFactory handlerFactory) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
        this.documentStore = documentStore;
        this.serializer = serializer;
        this.handlerFactory = handlerFactory;
    }

    @FunctionalInterface
    protected static interface AggregateFactoryFunction {
        public EventSourcedAggregate<?> create(String var1, boolean var2, boolean var3);
    }

    protected class EventSourcedAggregate<T>
    implements AggregateRoot<T> {
        private final Class<T> aggregateType;
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStoreSerializer serializer;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final DocumentStore documentStore;
        private final boolean eventSourced;
        private final boolean searchable;
        private final String collection;
        private final Function<AggregateRoot<?>, Instant> timestampFunction;
        private final List<DeserializingMessage> unpublishedEvents = new ArrayList<DeserializingMessage>();
        private final boolean readOnly;
        private final String id;
        private EventSourcedModel<T> model;
        private boolean updated;

        protected void initialize() {
            this.model = Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent(keyFunction.apply(this.id))).filter(a -> a.get() == null || this.aggregateType.isAssignableFrom(a.get().getClass())).orElseGet(() -> {
                EventSourcedModel model = (this.searchable && !this.eventSourced ? this.documentStore.getDocument(this.id, this.collection).map(d -> EventSourcedModel.builder().id(this.id).type(this.aggregateType).model(d).build()) : this.snapshotRepository.getSnapshot(this.id)).filter(a -> {
                    boolean assignable;
                    boolean bl = assignable = a.get() == null || this.aggregateType.isAssignableFrom(a.get().getClass());
                    if (!assignable) {
                        log.warn("Could not load aggregate {} because the requested type {} is not assignable to the stored type {}", new Object[]{this.id, this.aggregateType, a.get().getClass()});
                    }
                    return assignable;
                }).orElseGet(() -> EventSourcedModel.builder().id(this.id).type(this.aggregateType).build());
                if (!this.eventSourced) {
                    return model;
                }
                AggregateEventStream<DeserializingMessage> eventStream = this.eventStore.getEvents(this.id, model.sequenceNumber());
                Iterator<DeserializingMessage> iterator = eventStream.iterator();
                while (iterator.hasNext()) {
                    DeserializingMessage event = iterator.next();
                    model = model.toBuilder().sequenceNumber(model.sequenceNumber() + 1L).type(this.aggregateType).id(this.id).lastEventId(event.getSerializedObject().getMessageId()).lastEventIndex(event.getSerializedObject().getIndex()).timestamp(Instant.ofEpochMilli(event.getSerializedObject().getTimestamp())).model(this.eventSourcingHandler.invoke(model, event)).previous(model).build();
                }
                return model.toBuilder().sequenceNumber(eventStream.getLastSequenceNumber().orElse(model.sequenceNumber())).build();
            });
        }

        public Class<?> getAggregateType() {
            return Optional.ofNullable(this.model).map(EventSourcedModel::get).map(m -> m.getClass()).orElse(this.aggregateType);
        }

        @Override
        public AggregateRoot<T> apply(Message message) {
            if (this.readOnly) {
                throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The model is readonly.", message));
            }
            Metadata metadata = message.getMetadata().with(new Object[]{"$aggregateId", this.id, "$aggregateType", this.getAggregateType().getName()});
            Message eventMessage = message.withMetadata(metadata);
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject<byte[], SerializedMessage>(this.serializer.serialize(eventMessage), type -> this.serializer.convert(eventMessage.getPayload(), type)), MessageType.EVENT);
            this.unpublishedEvents.add(deserializingMessage);
            this.updateModel(this.model.toBuilder().sequenceNumber(this.model.sequenceNumber() + 1L).model(this.eventSourcingHandler.invoke(this.model, deserializingMessage)).previous(this.model).lastEventId(eventMessage.getMessageId()).timestamp(eventMessage.getTimestamp()).lastEventIndex(deserializingMessage.getSerializedObject().getIndex()).build());
            return this;
        }

        @Override
        public AggregateRoot<T> update(UnaryOperator<T> function) {
            if (this.eventSourced) {
                log.warn("An event sourced aggregate is updated without applying an event. This is typically a mistake. On aggregate: {}", (Object)this);
            }
            this.updateModel((EventSourcedModel<T>)this.model.update((UnaryOperator)function));
            return this;
        }

        protected void updateModel(EventSourcedModel<T> model) {
            this.model = model;
            this.updated = true;
            if (DefaultAggregateRepository.this.loadedModels.get() == null) {
                DefaultAggregateRepository.this.loadedModels.set(Collections.asLifoQueue(new ArrayDeque()));
                DefaultAggregateRepository.this.loadedModels.get().add(this);
                Runnable commit = () -> {
                    Collection<EventSourcedAggregate<?>> models = DefaultAggregateRepository.this.loadedModels.get();
                    DefaultAggregateRepository.this.loadedModels.remove();
                    models.stream().map(EventSourcedAggregate::commit).reduce(Awaitable::join).ifPresent(a -> {
                        try {
                            a.await();
                        }
                        catch (Exception e) {
                            List<String> aggregateIds = models.stream().map(m -> m.id).collect(Collectors.toList());
                            log.error("Failed to commit events for aggregates {}. Clearing aggregates from the cache.", aggregateIds, (Object)e);
                            aggregateIds.forEach(id -> this.cache.invalidate(keyFunction.apply((String)id)));
                        }
                    });
                };
                if (this.aggregateType.getAnnotation(Aggregate.class).commitInBatch()) {
                    DeserializingMessage.whenBatchCompletes(commit);
                } else {
                    DeserializingMessage.whenMessageCompletes(commit);
                }
            } else if (DefaultAggregateRepository.this.loadedModels.get().stream().noneMatch(e -> e == this)) {
                DefaultAggregateRepository.this.loadedModels.get().add(this);
            }
        }

        @Override
        public <E extends Exception> AggregateRoot<T> assertLegal(Object ... commands) throws E {
            switch (commands.length) {
                case 0: {
                    return this;
                }
                case 1: {
                    ValidationUtils.assertLegal(commands[0], this.model);
                    return this;
                }
            }
            EventSourcedModel<T> result = this.model;
            Iterator iterator = Arrays.stream(commands).iterator();
            while (iterator.hasNext()) {
                Object c = iterator.next();
                ValidationUtils.assertLegal(c, result);
                if (!iterator.hasNext()) continue;
                result = this.forceApply(this.model, c instanceof Message ? (Message)c : new Message(c));
            }
            return this;
        }

        protected EventSourcedModel<T> forceApply(EventSourcedModel<T> model, Message message) {
            Message eventMessage = message.withMetadata(message.getMetadata().with(new Object[]{"$aggregateId", this.id, "$aggregateType", this.getAggregateType().getName()}));
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject<byte[], SerializedMessage>(this.serializer.serialize(message), type -> this.serializer.convert(eventMessage.getPayload(), type)), MessageType.EVENT);
            return model.toBuilder().sequenceNumber(model.sequenceNumber() + 1L).model(this.eventSourcingHandler.invoke(model, deserializingMessage)).previous(model).lastEventId(eventMessage.getMessageId()).timestamp(eventMessage.getTimestamp()).lastEventIndex(deserializingMessage.getSerializedObject().getIndex()).build();
        }

        @Override
        public T get() {
            return this.model.get();
        }

        @Override
        public AggregateRoot<T> previous() {
            return this.model.previous();
        }

        @Override
        public String lastEventId() {
            return this.model.lastEventId();
        }

        @Override
        public Long lastEventIndex() {
            return this.model.lastEventIndex();
        }

        @Override
        public Instant timestamp() {
            return this.model.timestamp();
        }

        @Override
        public String id() {
            return this.id;
        }

        @Override
        public Class<T> type() {
            return this.aggregateType;
        }

        public String toString() {
            return "EventSourcedAggregate{collection='" + this.collection + "', eventSourced=" + this.eventSourced + ", searchable=" + this.searchable + ", readOnly=" + this.readOnly + ", model=" + this.model + "}";
        }

        protected Awaitable commit() {
            Awaitable result = Awaitable.ready();
            if (this.updated) {
                try {
                    this.cache.put(keyFunction.apply(this.model.id()), this.model);
                    if (!this.unpublishedEvents.isEmpty()) {
                        result = this.eventStore.storeEvents(this.model.id(), this.collection, this.model.sequenceNumber(), new ArrayList<DeserializingMessage>(this.unpublishedEvents));
                        if (this.snapshotTrigger.shouldCreateSnapshot(this.model, this.unpublishedEvents)) {
                            this.snapshotRepository.storeSnapshot(this.model);
                        }
                    }
                    if (this.searchable) {
                        T value = this.model.get();
                        if (value == null) {
                            this.documentStore.deleteDocument(this.model.id(), this.collection);
                        } else {
                            this.documentStore.index(value, this.model.id(), this.collection, this.timestampFunction.apply(this.model));
                        }
                    }
                }
                catch (Exception e) {
                    log.error("Failed to commit new events of aggregate {}", (Object)this.model.id(), (Object)e);
                    this.cache.invalidate(keyFunction.apply(this.model.id()));
                }
                finally {
                    this.unpublishedEvents.clear();
                    this.updated = false;
                }
            }
            return result;
        }

        @ConstructorProperties(value={"aggregateType", "eventSourcingHandler", "cache", "serializer", "eventStore", "snapshotRepository", "snapshotTrigger", "documentStore", "eventSourced", "searchable", "collection", "timestampFunction", "readOnly", "id"})
        public EventSourcedAggregate(Class<T> aggregateType, EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStoreSerializer serializer, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, DocumentStore documentStore, boolean eventSourced, boolean searchable, String collection, Function<AggregateRoot<?>, Instant> timestampFunction, boolean readOnly, String id) {
            this.aggregateType = aggregateType;
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.serializer = serializer;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.documentStore = documentStore;
            this.eventSourced = eventSourced;
            this.searchable = searchable;
            this.collection = collection;
            this.timestampFunction = timestampFunction;
            this.readOnly = readOnly;
            this.id = id;
        }
    }
}

