package io.fluxcapacitor.javaclient.persisting.repository;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.modeling.Relationship;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.modeling.Entity;
import io.fluxcapacitor.javaclient.modeling.ImmutableAggregateRoot;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/repository/CachingAggregateRepository.class */
public class CachingAggregateRepository implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(CachingAggregateRepository.class);
    public static Duration slowTrackingThreshold = Duration.ofSeconds(10);
    private final AggregateRepository delegate;
    private final Client client;
    private final Cache cache;
    private final Cache relationshipsCache;
    private final Serializer serializer;
    private final AtomicBoolean started = new AtomicBoolean();
    private volatile long lastEventIndex = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.fluxcapacitor.javaclient.persisting.repository.CachingAggregateRepository$1, reason: invalid class name */
    /* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/repository/CachingAggregateRepository$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$fluxcapacitor$common$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$io$fluxcapacitor$common$MessageType[MessageType.EVENT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$fluxcapacitor$common$MessageType[MessageType.NOTIFICATION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // io.fluxcapacitor.javaclient.persisting.repository.AggregateRepository
    public <T> Entity<T> load(@NonNull String str, Class<T> cls) {
        if (str == null) {
            throw new NullPointerException("aggregateId is marked non-null but is null");
        }
        catchUpIfNeeded();
        return this.delegate.load(str, cls);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.repository.AggregateRepository
    public <T> Entity<T> loadFor(@NonNull String str, Class<?> cls) {
        if (str == null) {
            throw new NullPointerException("entityId is marked non-null but is null");
        }
        catchUpIfNeeded();
        return this.delegate.loadFor(str, cls);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.repository.AggregateRepository
    public Map<String, Class<?>> getAggregatesFor(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("entityId is marked non-null but is null");
        }
        return this.delegate.getAggregatesFor(str);
    }

    protected void handleEvents(List<SerializedMessage> list) {
        try {
            DeserializingMessage.handleBatch(this.serializer.deserializeMessages(list.stream(), MessageType.EVENT)).forEach(this::handleEvent);
        } finally {
            list.stream().reduce((serializedMessage, serializedMessage2) -> {
                return serializedMessage2;
            }).map((v0) -> {
                return v0.getIndex();
            }).ifPresent(l -> {
                this.lastEventIndex = l.longValue();
                synchronized (this.cache) {
                    this.cache.notifyAll();
                }
            });
        }
    }

    private void handleEvent(DeserializingMessage deserializingMessage) {
        String aggregateId = Entity.getAggregateId(deserializingMessage);
        Class<?> aggregateType = Entity.getAggregateType(deserializingMessage);
        if (aggregateId == null || aggregateType == null || !cachingAllowed(aggregateType)) {
            return;
        }
        try {
            if (Objects.equals(this.client.id(), deserializingMessage.getSerializedObject().getSource())) {
                this.cache.computeIfPresent(aggregateId, (obj, immutableAggregateRoot) -> {
                    return immutableAggregateRoot.withEventIndex(deserializingMessage.getIndex(), deserializingMessage.getMessageId());
                });
            } else {
                long longValue = deserializingMessage.getIndex().longValue();
                this.cache.computeIfPresent(aggregateId, (obj2, immutableAggregateRoot2) -> {
                    Long highestEventIndex = immutableAggregateRoot2.highestEventIndex();
                    if (highestEventIndex != null && highestEventIndex.longValue() >= longValue) {
                        return immutableAggregateRoot2;
                    }
                    boolean isLoading = Entity.isLoading();
                    try {
                        Entity.loading.set(true);
                        ImmutableAggregateRoot<?> apply = immutableAggregateRoot2.apply(deserializingMessage);
                        updateRelationships(immutableAggregateRoot2, apply);
                        Entity.loading.set(Boolean.valueOf(isLoading));
                        return apply;
                    } catch (Throwable th) {
                        Entity.loading.set(Boolean.valueOf(isLoading));
                        throw th;
                    }
                });
            }
        } catch (Throwable th) {
            log.error("Failed to handle event {} for aggregate {} (id {})", new Object[]{deserializingMessage.getMessageId(), aggregateType, aggregateId, th});
        }
    }

    protected void updateRelationships(ImmutableAggregateRoot<?> immutableAggregateRoot, ImmutableAggregateRoot<?> immutableAggregateRoot2) {
        Set<Relationship> associations = immutableAggregateRoot2.associations(immutableAggregateRoot);
        immutableAggregateRoot2.dissociations(immutableAggregateRoot).forEach(relationship -> {
            this.relationshipsCache.computeIfPresent(relationship.getEntityId(), (obj, map) -> {
                map.remove(relationship.getAggregateId());
                return map;
            });
        });
        associations.forEach(relationship2 -> {
            this.relationshipsCache.computeIfPresent(relationship2.getEntityId(), (obj, map) -> {
                map.put(relationship2.getAggregateId(), immutableAggregateRoot2.type());
                return map;
            });
        });
    }

    protected void catchUpIfNeeded() {
        startTrackerIfNeeded();
        DeserializingMessage current = DeserializingMessage.getCurrent();
        if (current == null || Entity.isLoading()) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$io$fluxcapacitor$common$MessageType[current.getMessageType().ordinal()]) {
            case 1:
            case 2:
                Long index = current.getIndex();
                if (index == null || this.lastEventIndex >= index.longValue()) {
                    return;
                }
                synchronized (this.cache) {
                    Instant now = Instant.now();
                    while (this.lastEventIndex < index.longValue()) {
                        try {
                            this.cache.wait(5000L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new IllegalStateException("Failed to load aggregate for event " + current, e);
                        }
                    }
                    Duration between = Duration.between(now, Instant.now());
                    if (between.compareTo(slowTrackingThreshold) > 0) {
                        log.warn("It took over {} for the aggregate repo tracking to get in sync. This may indicate that the repo has trouble keeping up.", between);
                    }
                }
                return;
            default:
                return;
        }
    }

    protected void startTrackerIfNeeded() {
        if (this.started.compareAndSet(false, true)) {
            Consumer consumer = this::handleEvents;
            ConsumerConfiguration.Builder ignoreSegment = ConsumerConfiguration.builder().messageType(MessageType.NOTIFICATION).ignoreSegment(true);
            long indexFromTimestamp = IndexUtils.indexFromTimestamp(FluxCapacitor.currentTime());
            this.lastEventIndex = indexFromTimestamp;
            DefaultTracker.start((Consumer<List<SerializedMessage>>) consumer, ignoreSegment.minIndex(Long.valueOf(indexFromTimestamp)).name(String.format("%s_%s", this.client.name(), CachingAggregateRepository.class.getSimpleName())).build(), this.client);
            synchronized (this.cache) {
                this.cache.notifyAll();
            }
        }
    }

    @Override // io.fluxcapacitor.javaclient.persisting.repository.AggregateRepository
    public boolean cachingAllowed(@NonNull Class<?> cls) {
        if (cls == null) {
            throw new NullPointerException("aggregateType is marked non-null but is null");
        }
        return this.delegate.cachingAllowed(cls);
    }

    @ConstructorProperties({"delegate", "client", "cache", "relationshipsCache", "serializer"})
    public CachingAggregateRepository(AggregateRepository aggregateRepository, Client client, Cache cache, Cache cache2, Serializer serializer) {
        this.delegate = aggregateRepository;
        this.client = client;
        this.cache = cache;
        this.relationshipsCache = cache2;
        this.serializer = serializer;
    }
}
