/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.repository;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.modeling.Relationship;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.modeling.Entity;
import io.fluxcapacitor.javaclient.modeling.ImmutableAggregateRoot;
import io.fluxcapacitor.javaclient.modeling.ImmutableEntity;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.repository.AggregateRepository;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingAggregateRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(CachingAggregateRepository.class);
    public static Duration slowTrackingThreshold = Duration.ofSeconds(10L);
    private final AggregateRepository delegate;
    private final Client client;
    private final Cache cache;
    private final Cache relationshipsCache;
    private final Serializer serializer;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile long lastEventIndex = -1L;

    @Override
    public <T> Entity<T> load(String aggregateId, Class<T> type) {
        this.catchUpIfNeeded();
        return this.delegate.load(aggregateId, type);
    }

    @Override
    public <T> Entity<T> loadFor(String entityId, Class<?> defaultType) {
        this.catchUpIfNeeded();
        return this.delegate.loadFor(entityId, defaultType);
    }

    @Override
    public Map<String, Class<?>> getAggregatesFor(String entityId) {
        return this.delegate.getAggregatesFor(entityId);
    }

    protected void handleEvents(List<SerializedMessage> messages) {
        try {
            DeserializingMessage.handleBatch(this.serializer.deserializeMessages(messages.stream(), MessageType.EVENT)).forEach(this::handleEvent);
        }
        finally {
            messages.stream().reduce((a, b) -> b).map(SerializedMessage::getIndex).ifPresent(index -> {
                this.lastEventIndex = index;
                Cache cache = this.cache;
                synchronized (cache) {
                    this.cache.notifyAll();
                }
            });
        }
    }

    private void handleEvent(DeserializingMessage m) {
        String id = Entity.getAggregateId(m);
        Class<?> type = Entity.getAggregateType(m);
        if (id != null && type != null && this.cachingAllowed(type)) {
            try {
                if (Objects.equals(this.client.id(), m.getSerializedObject().getSource())) {
                    this.cache.computeIfPresent(id, (i, a) -> a.withEventIndex(m.getIndex(), m.getMessageId()));
                } else {
                    long index = m.getIndex();
                    this.delegate.load(id, type);
                    this.cache.computeIfPresent(id, (i, before) -> {
                        Long lastIndex = before.highestEventIndex();
                        if (lastIndex == null || lastIndex < index) {
                            boolean wasLoading = Entity.isLoading();
                            try {
                                Entity.loading.set(true);
                                ImmutableEntity after = before.apply(m);
                                this.updateRelationships((ImmutableAggregateRoot<?>)before, (ImmutableAggregateRoot<?>)after);
                                ImmutableEntity immutableEntity = after;
                                return immutableEntity;
                            }
                            finally {
                                Entity.loading.set(wasLoading);
                            }
                        }
                        return before;
                    });
                }
            }
            catch (Exception e) {
                log.error("Failed to handle event {} for aggregate {} (id {})", new Object[]{m.getMessageId(), type, id, e});
            }
        }
    }

    protected void updateRelationships(ImmutableAggregateRoot<?> before, ImmutableAggregateRoot<?> after) {
        Set<Relationship> associations = after.associations(before);
        Set<Relationship> dissociations = after.dissociations(before);
        dissociations.forEach(r -> this.relationshipsCache.computeIfPresent(r.getEntityId(), (id, map) -> {
            map.remove(r.getAggregateId());
            return map;
        }));
        associations.forEach(r -> this.relationshipsCache.computeIfPresent(r.getEntityId(), (id, map) -> {
            map.put(r.getAggregateId(), after.type());
            return map;
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void catchUpIfNeeded() {
        this.startTrackerIfNeeded();
        DeserializingMessage current = DeserializingMessage.getCurrent();
        if (current != null && !Entity.isLoading()) {
            switch (current.getMessageType()) {
                case EVENT: 
                case NOTIFICATION: {
                    Long eventIndex = current.getIndex();
                    if (eventIndex == null || this.lastEventIndex >= eventIndex) break;
                    Cache cache = this.cache;
                    synchronized (cache) {
                        Instant start = Instant.now();
                        while (this.lastEventIndex < eventIndex) {
                            try {
                                this.cache.wait(5000L);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new IllegalStateException("Failed to load aggregate for event " + current, e);
                            }
                        }
                        Duration fetchDuration = Duration.between(start, Instant.now());
                        if (fetchDuration.compareTo(slowTrackingThreshold) > 0) {
                            log.warn("It took over {} for the aggregate repo tracking to get in sync. This may indicate that the repo has trouble keeping up.", (Object)fetchDuration);
                        }
                        break;
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startTrackerIfNeeded() {
        if (this.started.compareAndSet(false, true)) {
            this.lastEventIndex = IndexUtils.indexForCurrentTime();
            DefaultTracker.start(this::handleEvents, ConsumerConfiguration.builder().messageType(MessageType.NOTIFICATION).minIndex(this.lastEventIndex).name(CachingAggregateRepository.class.getSimpleName()).build(), this.client);
            Cache cache = this.cache;
            synchronized (cache) {
                this.cache.notifyAll();
            }
        }
    }

    @Override
    public boolean cachingAllowed(Class<?> aggregateType) {
        return this.delegate.cachingAllowed(aggregateType);
    }

    @ConstructorProperties(value={"delegate", "client", "cache", "relationshipsCache", "serializer"})
    public CachingAggregateRepository(AggregateRepository delegate, Client client, Cache cache, Cache relationshipsCache, Serializer serializer) {
        this.delegate = delegate;
        this.client = client;
        this.cache = cache;
        this.relationshipsCache = relationshipsCache;
        this.serializer = serializer;
    }
}

