package org.axonframework.eventstore.jpa;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import org.axonframework.common.IdentifierValidator;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.jpa.criteria.JpaCriteria;
import org.axonframework.eventstore.jpa.criteria.JpaCriteriaBuilder;
import org.axonframework.eventstore.jpa.criteria.ParameterRegistry;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.SerializedDomainEventData;
import org.axonframework.serializer.SerializedDomainEventMessage;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.axonframework.upcasting.SimpleUpcasterChain;
import org.axonframework.upcasting.UpcastSerializedDomainEventData;
import org.axonframework.upcasting.UpcasterAware;
import org.axonframework.upcasting.UpcasterChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventstore/jpa/JpaEventStore.class */
public class JpaEventStore implements SnapshotEventStore, EventStoreManagement, UpcasterAware {
    private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
    private final Serializer eventSerializer;
    private final EventEntryStore eventEntryStore;
    private int batchSize;
    private UpcasterChain upcasterChain;
    private final EntityManagerProvider entityManagerProvider;
    private int maxSnapshotsArchived;
    private PersistenceExceptionResolver persistenceExceptionResolver;

    /* loaded from: input_file:org/axonframework/eventstore/jpa/JpaEventStore$BatchingDomainEventStream.class */
    private final class BatchingDomainEventStream implements DomainEventStream {
        private int currentBatchSize;
        private Iterator<DomainEventMessage> currentBatch;
        private DomainEventMessage next;
        private final Object id;
        private final String typeId;

        private BatchingDomainEventStream(List<DomainEventMessage> list, Object obj, String str) {
            this.id = obj;
            this.typeId = str;
            this.currentBatchSize = list.size();
            this.currentBatch = list.iterator();
            if (this.currentBatch.hasNext()) {
                this.next = this.currentBatch.next();
            }
        }

        @Override // org.axonframework.domain.DomainEventStream
        public boolean hasNext() {
            return this.next != null;
        }

        @Override // org.axonframework.domain.DomainEventStream
        public DomainEventMessage next() {
            DomainEventMessage domainEventMessage = this.next;
            if (this.next != null && !this.currentBatch.hasNext() && this.currentBatchSize >= JpaEventStore.this.batchSize) {
                JpaEventStore.logger.debug("Fetching new batch for Aggregate [{}]", this.id);
                List fetchBatch = JpaEventStore.this.fetchBatch(this.typeId, this.id, this.next.getSequenceNumber() + 1);
                this.currentBatchSize = fetchBatch.size();
                this.currentBatch = fetchBatch.iterator();
            }
            this.next = this.currentBatch.hasNext() ? this.currentBatch.next() : null;
            return domainEventMessage;
        }

        @Override // org.axonframework.domain.DomainEventStream
        public DomainEventMessage peek() {
            return this.next;
        }
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider) {
        this(entityManagerProvider, new XStreamSerializer(), new DefaultEventEntryStore());
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, EventEntryStore eventEntryStore) {
        this(entityManagerProvider, new XStreamSerializer(), eventEntryStore);
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, Serializer serializer) {
        this(entityManagerProvider, serializer, new DefaultEventEntryStore());
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, Serializer serializer, EventEntryStore eventEntryStore) {
        this.batchSize = DEFAULT_BATCH_SIZE;
        this.upcasterChain = SimpleUpcasterChain.EMPTY;
        this.maxSnapshotsArchived = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
        this.entityManagerProvider = entityManagerProvider;
        this.eventSerializer = serializer;
        this.eventEntryStore = eventEntryStore;
    }

    @Override // org.axonframework.eventstore.EventStore
    public void appendEvents(String str, DomainEventStream domainEventStream) {
        DomainEventMessage domainEventMessage = null;
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            while (domainEventStream.hasNext()) {
                domainEventMessage = domainEventStream.next();
                IdentifierValidator.validateIdentifier(domainEventMessage.getAggregateIdentifier().getClass());
                this.eventEntryStore.persistEvent(str, domainEventMessage, this.eventSerializer.serialize(domainEventMessage.getPayload(), byte[].class), this.eventSerializer.serialize(domainEventMessage.getMetaData(), byte[].class), entityManager);
            }
            entityManager.flush();
        } catch (RuntimeException e) {
            if (this.persistenceExceptionResolver != null && this.persistenceExceptionResolver.isDuplicateKeyViolation(e)) {
                throw new ConcurrencyException(String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]", domainEventMessage.getAggregateIdentifier(), Long.valueOf(domainEventMessage.getSequenceNumber())), e);
            }
            throw e;
        }
    }

    @Override // org.axonframework.eventstore.EventStore
    public DomainEventStream readEvents(String str, Object obj) {
        long j = -1;
        SerializedDomainEventData loadLastSnapshotEvent = this.eventEntryStore.loadLastSnapshotEvent(str, obj, this.entityManagerProvider.getEntityManager());
        GenericDomainEventMessage genericDomainEventMessage = null;
        if (loadLastSnapshotEvent != null) {
            try {
                genericDomainEventMessage = new GenericDomainEventMessage(obj, loadLastSnapshotEvent.getSequenceNumber(), this.eventSerializer.deserialize(loadLastSnapshotEvent.getPayload()), (Map) this.eventSerializer.deserialize(loadLastSnapshotEvent.getMetaData()));
                j = genericDomainEventMessage.getSequenceNumber();
            } catch (LinkageError e) {
                logger.warn("Error while reading snapshot event entry. Reconstructing aggregate on entire event stream. Caused by: {} {}", e.getClass().getName(), e.getMessage());
            } catch (RuntimeException e2) {
                logger.warn("Error while reading snapshot event entry. Reconstructing aggregate on entire event stream. Caused by: {} {}", e2.getClass().getName(), e2.getMessage());
            }
        }
        List<DomainEventMessage> fetchBatch = fetchBatch(str, obj, j + 1);
        if (genericDomainEventMessage != null) {
            fetchBatch.add(0, genericDomainEventMessage);
        }
        if (fetchBatch.isEmpty()) {
            throw new EventStreamNotFoundException(str, obj);
        }
        return new BatchingDomainEventStream(fetchBatch, obj, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<DomainEventMessage> fetchBatch(String str, Object obj, long j) {
        List<? extends SerializedDomainEventData> fetchBatch = this.eventEntryStore.fetchBatch(str, obj, j, this.batchSize, this.entityManagerProvider.getEntityManager());
        ArrayList arrayList = new ArrayList(fetchBatch.size());
        Iterator<? extends SerializedDomainEventData> it = fetchBatch.iterator();
        while (it.hasNext()) {
            arrayList.addAll(upcastAndDeserialize(it.next(), obj));
        }
        return arrayList;
    }

    private List<DomainEventMessage> upcastAndDeserialize(SerializedDomainEventData serializedDomainEventData, Object obj) {
        List<SerializedObject> upcast = this.upcasterChain.upcast(serializedDomainEventData.getPayload());
        ArrayList arrayList = new ArrayList(upcast.size());
        Iterator<SerializedObject> it = upcast.iterator();
        while (it.hasNext()) {
            arrayList.add(new SerializedDomainEventMessage(new UpcastSerializedDomainEventData(serializedDomainEventData, obj, it.next()), this.eventSerializer));
        }
        return arrayList;
    }

    @Override // org.axonframework.eventstore.SnapshotEventStore
    public void appendSnapshotEvent(String str, DomainEventMessage domainEventMessage) {
        this.eventEntryStore.persistSnapshot(str, domainEventMessage, this.eventSerializer.serialize(domainEventMessage.getPayload(), byte[].class), this.eventSerializer.serialize(domainEventMessage.getMetaData(), byte[].class), this.entityManagerProvider.getEntityManager());
        if (this.maxSnapshotsArchived > 0) {
            this.eventEntryStore.pruneSnapshots(str, domainEventMessage, this.maxSnapshotsArchived, this.entityManagerProvider.getEntityManager());
        }
    }

    @Override // org.axonframework.eventstore.management.EventStoreManagement
    public void visitEvents(EventVisitor eventVisitor) {
        doVisitEvents(eventVisitor, null, Collections.emptyMap());
    }

    @Override // org.axonframework.eventstore.management.EventStoreManagement
    public void visitEvents(Criteria criteria, EventVisitor eventVisitor) {
        StringBuilder sb = new StringBuilder();
        ParameterRegistry parameterRegistry = new ParameterRegistry();
        ((JpaCriteria) criteria).parse("e", sb, parameterRegistry);
        doVisitEvents(eventVisitor, sb.toString(), parameterRegistry.getParameters());
    }

    @Override // org.axonframework.eventstore.management.EventStoreManagement
    public CriteriaBuilder newCriteriaBuilder() {
        return new JpaCriteriaBuilder();
    }

    private void doVisitEvents(EventVisitor eventVisitor, String str, Map<String, Object> map) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        int i = 0;
        boolean z = DEFAULT_MAX_SNAPSHOTS_ARCHIVED;
        while (z) {
            List<? extends SerializedDomainEventData> fetchFilteredBatch = this.eventEntryStore.fetchFilteredBatch(str, map, i, this.batchSize, entityManager);
            for (SerializedDomainEventData serializedDomainEventData : fetchFilteredBatch) {
                Iterator<DomainEventMessage> it = upcastAndDeserialize(serializedDomainEventData, serializedDomainEventData.getAggregateIdentifier()).iterator();
                while (it.hasNext()) {
                    eventVisitor.doWithEvent(it.next());
                }
            }
            z = fetchFilteredBatch.size() >= this.batchSize;
            i += this.batchSize;
        }
    }

    public void setDataSource(DataSource dataSource) throws SQLException {
        if (this.persistenceExceptionResolver == null) {
            this.persistenceExceptionResolver = new SQLErrorCodesResolver(dataSource);
        }
    }

    public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
        this.persistenceExceptionResolver = persistenceExceptionResolver;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    @Override // org.axonframework.upcasting.UpcasterAware
    public void setUpcasterChain(UpcasterChain upcasterChain) {
        this.upcasterChain = upcasterChain;
    }

    public void setMaxSnapshotsArchived(int i) {
        this.maxSnapshotsArchived = i;
    }
}
