/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.eventsourcing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.modeling.Aggregate;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.caching.NoOpCache;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.DefaultEventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourced;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcedModel;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStore;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventStoreSerializer;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoOpSnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.NoSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.PeriodicSnapshotTrigger;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotRepository;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.SnapshotTrigger;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSourcingRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(EventSourcingRepository.class);
    private static final Function<String, String> keyFunction = aggregateId -> EventSourcingRepository.class.getSimpleName() + ":" + aggregateId;
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final EventStoreSerializer serializer;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Map<Class<?>, Function<String, EventSourcedAggregate<?>>> aggregateFactory = new ConcurrentHashMap();
    private final ThreadLocal<Collection<EventSourcedAggregate<?>>> loadedModels = new ThreadLocal();

    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer serializer) {
        this(eventStore, snapshotRepository, cache, serializer, new DefaultEventSourcingHandlerFactory(DeserializingMessage.defaultParameterResolvers));
    }

    @Override
    public boolean supports(Class<?> aggregateType) {
        return aggregateType.isAnnotationPresent(EventSourced.class);
    }

    @Override
    public <T> Aggregate<T> load(String aggregateId, Class<T> aggregateType, boolean onlyCached) {
        if (onlyCached) {
            return Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent(keyFunction.apply(aggregateId))).orElse(null);
        }
        if (this.loadedModels.get() == null) {
            if (Optional.ofNullable(DeserializingMessage.getCurrent()).map(m -> m.getMessageType() == MessageType.COMMAND).isPresent()) {
                this.loadedModels.set(Collections.asLifoQueue(new ArrayDeque()));
                DeserializingMessage.whenBatchCompletes(() -> {
                    Collection<EventSourcedAggregate<?>> models = this.loadedModels.get();
                    this.loadedModels.remove();
                    models.forEach(EventSourcedAggregate::commit);
                });
            } else {
                return this.createAggregate(aggregateType, aggregateId);
            }
        }
        Collection<EventSourcedAggregate<?>> loaded = this.loadedModels.get();
        return loaded.stream().filter(model -> ((EventSourcedAggregate)model).id.equals(aggregateId)).map(m -> m).findAny().orElseGet(() -> {
            EventSourcedAggregate model = this.createAggregate(aggregateType, aggregateId);
            loaded.add(model);
            return model;
        });
    }

    protected <T> EventSourcedAggregate<T> createAggregate(Class<T> aggregateType, String aggregateId) {
        return (EventSourcedAggregate)this.aggregateFactory.computeIfAbsent(aggregateType, t -> {
            EventSourcingHandler eventSourcingHandler = this.handlerFactory.forType(aggregateType);
            Cache cache = this.isCached(aggregateType) ? this.cache : NoOpCache.INSTANCE;
            SnapshotRepository snapshotRepository = this.snapshotRepository(aggregateType);
            SnapshotTrigger snapshotTrigger = this.snapshotTrigger(aggregateType);
            String domain = this.domain(aggregateType);
            return id -> {
                EventSourcedAggregate eventSourcedAggregate = new EventSourcedAggregate(aggregateType, eventSourcingHandler, cache, this.serializer, this.eventStore, snapshotRepository, snapshotTrigger, domain, this.loadedModels.get() == null, (String)id);
                eventSourcedAggregate.initialize();
                return eventSourcedAggregate;
            };
        }).apply(aggregateId);
    }

    protected SnapshotRepository snapshotRepository(Class<?> aggregateType) {
        int frequency = Optional.ofNullable(aggregateType.getAnnotation(EventSourced.class)).map(EventSourced::snapshotPeriod).orElse((int)((Integer)EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
        return frequency > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> aggregateType) {
        int frequency = Optional.ofNullable(aggregateType.getAnnotation(EventSourced.class)).map(EventSourced::snapshotPeriod).orElse((int)((Integer)EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
        return frequency > 0 ? new PeriodicSnapshotTrigger(frequency) : NoSnapshotTrigger.INSTANCE;
    }

    protected boolean isCached(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(EventSourced.class)).map(EventSourced::cached).orElse((boolean)((Boolean)EventSourced.class.getMethod("cached", new Class[0]).getDefaultValue()));
    }

    protected String domain(Class<?> aggregateType) {
        return Optional.ofNullable(aggregateType.getAnnotation(EventSourced.class)).map(EventSourced::domain).filter(s -> !s.isEmpty()).orElse(aggregateType.getSimpleName());
    }

    @ConstructorProperties(value={"eventStore", "snapshotRepository", "cache", "serializer", "handlerFactory"})
    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer serializer, EventSourcingHandlerFactory handlerFactory) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
        this.serializer = serializer;
        this.handlerFactory = handlerFactory;
    }

    protected static class EventSourcedAggregate<T>
    implements Aggregate<T> {
        private final Class<T> aggregateType;
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStoreSerializer serializer;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final String domain;
        private final List<DeserializingMessage> unpublishedEvents = new ArrayList<DeserializingMessage>();
        private final boolean readOnly;
        private final String id;
        private EventSourcedModel<T> model;

        protected void initialize() {
            this.model = Optional.ofNullable((EventSourcedModel)this.cache.getIfPresent((String)keyFunction.apply(this.id))).orElseGet(() -> {
                EventSourcedModel model = this.snapshotRepository.getSnapshot(this.id).orElse(EventSourcedModel.builder().id(this.id).build());
                for (DeserializingMessage event : this.eventStore.getDomainEvents(this.id, model.sequenceNumber()).collect(Collectors.toList())) {
                    model = model.toBuilder().sequenceNumber(model.sequenceNumber() + 1L).lastEventId(event.getSerializedObject().getMessageId()).timestamp(Instant.ofEpochMilli(event.getSerializedObject().getTimestamp())).model(this.eventSourcingHandler.invoke(model.get(), event)).build();
                }
                return model;
            });
        }

        @Override
        public Aggregate<T> apply(Message eventMessage) {
            if (this.readOnly) {
                throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The model is readonly.", eventMessage));
            }
            Metadata metadata = eventMessage.getMetadata();
            metadata.put("$aggregateId", this.id);
            metadata.put("$aggregateType", this.aggregateType.getName());
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject<byte[], SerializedMessage>(this.serializer.serialize(eventMessage), eventMessage::getPayload), MessageType.EVENT);
            this.unpublishedEvents.add(deserializingMessage);
            this.model = this.model.toBuilder().sequenceNumber(this.model.sequenceNumber() + 1L).lastEventId(eventMessage.getMessageId()).timestamp(eventMessage.getTimestamp()).model(this.eventSourcingHandler.invoke(this.model.get(), deserializingMessage)).build();
            return this;
        }

        @Override
        public T get() {
            return this.model.get();
        }

        @Override
        public String lastEventId() {
            return this.model.lastEventId();
        }

        @Override
        public Instant timestamp() {
            return this.model.timestamp();
        }

        protected void commit() {
            if (!this.unpublishedEvents.isEmpty()) {
                try {
                    this.cache.put((String)keyFunction.apply(this.model.id()), this.model);
                    this.eventStore.storeDomainEvents(this.model.id(), this.domain, this.model.sequenceNumber(), new ArrayList<DeserializingMessage>(this.unpublishedEvents));
                    if (this.snapshotTrigger.shouldCreateSnapshot(this.model, this.unpublishedEvents)) {
                        this.snapshotRepository.storeSnapshot(this.model);
                    }
                }
                catch (Exception e) {
                    log.error("Failed to commit new events of aggregate {}", (Object)this.model.id(), (Object)e);
                    this.cache.invalidate((String)keyFunction.apply(this.model.id()));
                }
                finally {
                    this.unpublishedEvents.clear();
                }
            }
        }

        @ConstructorProperties(value={"aggregateType", "eventSourcingHandler", "cache", "serializer", "eventStore", "snapshotRepository", "snapshotTrigger", "domain", "readOnly", "id"})
        public EventSourcedAggregate(Class<T> aggregateType, EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStoreSerializer serializer, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, String domain, boolean readOnly, String id) {
            this.aggregateType = aggregateType;
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.serializer = serializer;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.domain = domain;
            this.readOnly = readOnly;
            this.id = id;
        }
    }
}

