package io.fluxcapacitor.axonclient.eventhandling;

import io.fluxcapacitor.axonclient.common.serialization.AxonMessageSerializer;
import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.api.Data;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.serialization.jackson.JacksonSerializer;
import io.fluxcapacitor.javaclient.eventsourcing.client.EventStoreClient;
import io.fluxcapacitor.javaclient.keyvalue.DefaultKeyValueStore;
import io.fluxcapacitor.javaclient.keyvalue.KeyValueStore;
import io.fluxcapacitor.javaclient.keyvalue.client.KeyValueClient;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/axonclient/eventhandling/FluxCapacitorEventStore.class */
public class FluxCapacitorEventStore extends AbstractEventBus implements EventStore {
    private static final Logger log = LoggerFactory.getLogger(FluxCapacitorEventStore.class);
    private final EventStoreClient delegate;
    private final KeyValueStore keyValueStore;
    private final AxonMessageSerializer serializer;

    public FluxCapacitorEventStore(EventStoreClient eventStoreClient, AxonMessageSerializer axonMessageSerializer, KeyValueClient keyValueClient) {
        this(NoOpMessageMonitor.INSTANCE, eventStoreClient, keyValueClient, axonMessageSerializer);
    }

    public FluxCapacitorEventStore(MessageMonitor<? super EventMessage<?>> messageMonitor, EventStoreClient eventStoreClient, KeyValueClient keyValueClient, AxonMessageSerializer axonMessageSerializer) {
        super(messageMonitor);
        this.delegate = eventStoreClient;
        this.keyValueStore = new DefaultKeyValueStore(keyValueClient, new JacksonSerializer());
        this.serializer = axonMessageSerializer;
    }

    protected void appendEvents(List<? extends EventMessage<?>> list) throws Exception {
        List<SerializedMessage> convert = convert(list);
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= list.size()) {
                return;
            }
            String aggregateId = getAggregateId(list.get(i2));
            int i3 = i2 + 1;
            while (i3 < list.size() && Objects.equals(aggregateId, getAggregateId(list.get(i3)))) {
                i3++;
            }
            DomainEventMessage domainEventMessage = (EventMessage) list.get(i3 - 1);
            long j = 0;
            String str = null;
            if (domainEventMessage instanceof DomainEventMessage) {
                DomainEventMessage domainEventMessage2 = domainEventMessage;
                j = domainEventMessage2.getSequenceNumber();
                str = domainEventMessage2.getType();
            }
            this.delegate.storeEvents(aggregateId, str, j, convert.subList(i2, i3)).await();
            i = i3;
        }
    }

    protected List<SerializedMessage> convert(List<? extends EventMessage<?>> list) {
        return (List) list.stream().map(eventMessage -> {
            SerializedMessage serializedMessage = new SerializedMessage(new Data(this.serializer.serializeEvent(eventMessage), eventMessage.getPayloadType().getName(), 0), Metadata.empty(), eventMessage.getIdentifier());
            serializedMessage.setSegment(Integer.valueOf(ConsistentHashing.computeSegment(getAggregateId(eventMessage))));
            return serializedMessage;
        }).collect(Collectors.toList());
    }

    private String getAggregateId(EventMessage<?> eventMessage) {
        if (eventMessage instanceof DomainEventMessage) {
            return ((DomainEventMessage) eventMessage).getAggregateIdentifier();
        }
        return null;
    }

    public DomainEventStream readEvents(String str) {
        Optional empty;
        try {
            Optional ofNullable = Optional.ofNullable((SerializedSnapshot) this.keyValueStore.get(snapshotKey(str)));
            AxonMessageSerializer axonMessageSerializer = this.serializer;
            axonMessageSerializer.getClass();
            empty = ofNullable.map(axonMessageSerializer::deserializeSnapshot);
        } catch (Exception | LinkageError e) {
            log.warn("Error reading snapshot. Reconstructing aggregate from entire event stream. Caused by: {} {}", e.getClass().getName(), e.getMessage());
            empty = Optional.empty();
            this.keyValueStore.delete(snapshotKey(str));
        }
        return (DomainEventStream) empty.map(domainEventMessage -> {
            return DomainEventStream.concat(DomainEventStream.of(domainEventMessage), readEvents(str, domainEventMessage.getSequenceNumber()));
        }).orElse(readEvents(str, 0L));
    }

    protected Stream<? extends DomainEventMessage<?>> stagedDomainEventMessages(String str) {
        return queuedMessages().stream().filter(eventMessage -> {
            return eventMessage instanceof DomainEventMessage;
        }).map(eventMessage2 -> {
            return (DomainEventMessage) eventMessage2;
        }).filter(domainEventMessage -> {
            return str.equals(domainEventMessage.getAggregateIdentifier());
        });
    }

    public DomainEventStream readEvents(String str, long j) {
        return DomainEventStream.concat(this.serializer.deserializeDomainEvents(this.delegate.getEvents(str, j - 1)), DomainEventStream.of(stagedDomainEventMessages(str).filter(domainEventMessage -> {
            return domainEventMessage.getSequenceNumber() >= j;
        })));
    }

    public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        this.keyValueStore.store(snapshotKey(domainEventMessage.getAggregateIdentifier()), new SerializedSnapshot(domainEventMessage.getAggregateIdentifier(), domainEventMessage.getSequenceNumber(), new Data(this.serializer.serializeDomainEvent(domainEventMessage), domainEventMessage.getPayloadType().getName(), 0)));
    }

    protected void prepareCommit(List<? extends EventMessage<?>> list) {
        super.prepareCommit(list);
        try {
            appendEvents(list);
        } catch (Exception e) {
            throw new IllegalStateException("Could not append events " + list, e);
        }
    }

    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        throw new UnsupportedOperationException("Subscribing event handlers are not supported");
    }

    /* renamed from: openStream, reason: merged with bridge method [inline-methods] */
    public TrackingEventStream m7openStream(TrackingToken trackingToken) {
        throw new UnsupportedOperationException("Tracking is supported via a dedicated event processor");
    }

    protected String snapshotKey(String str) {
        return "$snapshot_" + str;
    }
}
