/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.caching;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.modeling.Aggregate;
import io.fluxcapacitor.javaclient.modeling.AggregateIdResolver;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.modeling.AggregateTypeResolver;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.EventSourcingHandlerFactory;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import io.fluxcapacitor.javaclient.tracking.client.TrackingUtils;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CachingAggregateRepository
implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(CachingAggregateRepository.class);
    private static final Function<String, String> keyFunction = aggregateId -> CachingAggregateRepository.class.getSimpleName() + ":" + aggregateId;
    private final AggregateRepository delegate;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Cache cache;
    private final String clientName;
    private final TrackingClient trackingClient;
    private final Serializer serializer;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicLong lastEventIndex = new AtomicLong();

    @Override
    public <T> Aggregate<T> load(@NonNull String aggregateId, @NonNull Class<T> aggregateType, boolean onlyCached) {
        if (aggregateId == null) {
            throw new NullPointerException("aggregateId is marked non-null but is null");
        }
        if (aggregateType == null) {
            throw new NullPointerException("aggregateType is marked non-null but is null");
        }
        DeserializingMessage current = DeserializingMessage.getCurrent();
        if (current != null && current.getMessageType() == MessageType.COMMAND) {
            return this.delegate.load(aggregateId, aggregateType, onlyCached);
        }
        Aggregate<T> result = this.delegate.load(aggregateId, aggregateType, true);
        if (result == null) {
            return Optional.ofNullable(this.doLoad(aggregateId)).orElseGet(() -> this.delegate.load(aggregateId, aggregateType, onlyCached));
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> RefreshingAggregate<T> doLoad(String aggregateId) {
        if (this.started.compareAndSet(false, true)) {
            log.info("Start tracking notifications");
            TrackingUtils.start(String.format("%s_%s", this.clientName, CachingAggregateRepository.class.getSimpleName()), this.trackingClient, this::handleEvents);
            return null;
        }
        DeserializingMessage current = DeserializingMessage.getCurrent();
        if (current != null) {
            switch (current.getMessageType()) {
                case EVENT: 
                case NOTIFICATION: {
                    Long eventIndex = current.getSerializedObject().getIndex();
                    if (eventIndex == null || this.lastEventIndex.get() >= eventIndex) break;
                    Cache cache = this.cache;
                    synchronized (cache) {
                        while (this.lastEventIndex.get() < eventIndex) {
                            try {
                                this.cache.wait(5000L);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                log.warn("Failed to load aggregate for event {}", (Object)current, (Object)e);
                                return null;
                            }
                        }
                        break;
                    }
                }
            }
        }
        return (RefreshingAggregate)this.cache.getIfPresent(keyFunction.apply(aggregateId));
    }

    protected void handleEvents(List<SerializedMessage> messages) {
        this.serializer.deserializeMessages(messages.stream(), false, MessageType.EVENT).forEach(m -> {
            String aggregateId = AggregateIdResolver.getAggregateId(m);
            Class<?> aggregateType = AggregateTypeResolver.getAggregateType(m);
            if (aggregateId != null && aggregateType != null) {
                try {
                    this.handleEvent((DeserializingMessage)m, aggregateId, aggregateType);
                }
                catch (Exception e) {
                    log.error("Failed to handle event for aggregate with id {} of type {}", new Object[]{aggregateId, aggregateType, e});
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleEvent(DeserializingMessage event, String aggregateId, Class<?> type) {
        try {
            EventSourcingHandler handler = this.handlerFactory.forType(type);
            String cacheKey = keyFunction.apply(aggregateId);
            String eventId = event.getSerializedObject().getMessageId();
            Instant timestamp = Instant.ofEpochMilli(event.getSerializedObject().getTimestamp());
            RefreshingAggregate aggregate = (RefreshingAggregate)this.cache.getIfPresent(cacheKey);
            if (aggregate == null && handler.canHandle(null, event)) {
                try {
                    aggregate = new RefreshingAggregate(handler.invoke(null, event), eventId, timestamp, true);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            if (aggregate == null || !aggregate.inSync() && aggregate.timestamp().isBefore(timestamp)) {
                aggregate = Optional.ofNullable(this.delegate.load(aggregateId, type)).map(a -> new RefreshingAggregate(a.get(), a.lastEventId(), a.timestamp(), Objects.equals(a.lastEventId(), eventId))).orElseGet(() -> {
                    log.warn("Delegate repository did not contain aggregate with id {} of type {}", (Object)aggregateId, (Object)type);
                    return null;
                });
            } else if (aggregate.inSync()) {
                try {
                    aggregate = new RefreshingAggregate(handler.invoke(aggregate.get(), event), eventId, timestamp, true);
                }
                catch (Exception e) {
                    log.warn("Failed to update aggregate with id {} of type {}", new Object[]{aggregateId, type, e});
                    aggregate = null;
                }
            }
            if (aggregate == null) {
                this.cache.invalidate(cacheKey);
            } else {
                this.cache.put(cacheKey, aggregate);
            }
        }
        finally {
            this.lastEventIndex.updateAndGet(i -> Optional.ofNullable(event.getSerializedObject().getIndex()).orElse(i));
            Cache cache = this.cache;
            synchronized (cache) {
                this.cache.notifyAll();
            }
        }
    }

    @Override
    public boolean supports(Class<?> aggregateType) {
        return this.delegate.supports(aggregateType);
    }

    @ConstructorProperties(value={"delegate", "handlerFactory", "cache", "clientName", "trackingClient", "serializer"})
    public CachingAggregateRepository(AggregateRepository delegate, EventSourcingHandlerFactory handlerFactory, Cache cache, String clientName, TrackingClient trackingClient, Serializer serializer) {
        this.delegate = delegate;
        this.handlerFactory = handlerFactory;
        this.cache = cache;
        this.clientName = clientName;
        this.trackingClient = trackingClient;
        this.serializer = serializer;
    }

    private static final class RefreshingAggregate<T>
    implements Aggregate<T> {
        private final T model;
        private final String lastEventId;
        private final Instant timestamp;
        private final boolean inSync;

        @Override
        public T get() {
            return this.model;
        }

        @Override
        public Aggregate<T> apply(Message eventMessage) {
            throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The aggregate is readonly.", eventMessage));
        }

        public static <T> RefreshingAggregateBuilder<T> builder() {
            return new RefreshingAggregateBuilder();
        }

        public RefreshingAggregateBuilder<T> toBuilder() {
            return new RefreshingAggregateBuilder<T>().model(this.model).lastEventId(this.lastEventId).timestamp(this.timestamp).inSync(this.inSync);
        }

        public T model() {
            return this.model;
        }

        @Override
        public String lastEventId() {
            return this.lastEventId;
        }

        @Override
        public Instant timestamp() {
            return this.timestamp;
        }

        public boolean inSync() {
            return this.inSync;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RefreshingAggregate)) {
                return false;
            }
            RefreshingAggregate other = (RefreshingAggregate)o;
            T this$model = this.model();
            T other$model = other.model();
            if (this$model == null ? other$model != null : !this$model.equals(other$model)) {
                return false;
            }
            String this$lastEventId = this.lastEventId();
            String other$lastEventId = other.lastEventId();
            if (this$lastEventId == null ? other$lastEventId != null : !this$lastEventId.equals(other$lastEventId)) {
                return false;
            }
            Instant this$timestamp = this.timestamp();
            Instant other$timestamp = other.timestamp();
            if (this$timestamp == null ? other$timestamp != null : !((Object)this$timestamp).equals(other$timestamp)) {
                return false;
            }
            return this.inSync() == other.inSync();
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            T $model = this.model();
            result = result * 59 + ($model == null ? 43 : $model.hashCode());
            String $lastEventId = this.lastEventId();
            result = result * 59 + ($lastEventId == null ? 43 : $lastEventId.hashCode());
            Instant $timestamp = this.timestamp();
            result = result * 59 + ($timestamp == null ? 43 : ((Object)$timestamp).hashCode());
            result = result * 59 + (this.inSync() ? 79 : 97);
            return result;
        }

        public String toString() {
            return "CachingAggregateRepository.RefreshingAggregate(model=" + this.model() + ", lastEventId=" + this.lastEventId() + ", timestamp=" + this.timestamp() + ", inSync=" + this.inSync() + ")";
        }

        @ConstructorProperties(value={"model", "lastEventId", "timestamp", "inSync"})
        public RefreshingAggregate(T model, String lastEventId, Instant timestamp, boolean inSync) {
            this.model = model;
            this.lastEventId = lastEventId;
            this.timestamp = timestamp;
            this.inSync = inSync;
        }

        public static class RefreshingAggregateBuilder<T> {
            private T model;
            private String lastEventId;
            private Instant timestamp;
            private boolean inSync;

            RefreshingAggregateBuilder() {
            }

            public RefreshingAggregateBuilder<T> model(T model) {
                this.model = model;
                return this;
            }

            public RefreshingAggregateBuilder<T> lastEventId(String lastEventId) {
                this.lastEventId = lastEventId;
                return this;
            }

            public RefreshingAggregateBuilder<T> timestamp(Instant timestamp) {
                this.timestamp = timestamp;
                return this;
            }

            public RefreshingAggregateBuilder<T> inSync(boolean inSync) {
                this.inSync = inSync;
                return this;
            }

            public RefreshingAggregate<T> build() {
                return new RefreshingAggregate<T>(this.model, this.lastEventId, this.timestamp, this.inSync);
            }

            public String toString() {
                return "CachingAggregateRepository.RefreshingAggregate.RefreshingAggregateBuilder(model=" + this.model + ", lastEventId=" + this.lastEventId + ", timestamp=" + this.timestamp + ", inSync=" + this.inSync + ")";
            }
        }
    }
}

