package org.axonframework.eventsourcing.eventstore.jpa;

import java.sql.SQLException;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import org.axonframework.common.Assert;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GapAwareTrackingToken;
import org.axonframework.eventsourcing.eventstore.GenericDomainEventEntry;
import org.axonframework.eventsourcing.eventstore.TrackedDomainEventData;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/jpa/JpaEventStorageEngine.class */
public class JpaEventStorageEngine extends BatchingEventStorageEngine {
    private static final int DEFAULT_GAP_CLEANING_THRESHOLD = 250;
    private static final Logger logger = LoggerFactory.getLogger(JpaEventStorageEngine.class);
    private static final int DEFAULT_GAP_TIMEOUT = 60000;
    private static final long DEFAULT_LOWEST_GLOBAL_SEQUENCE = 1;
    private static final int DEFAULT_MAX_GAP_OFFSET = 10000;
    private final EntityManagerProvider entityManagerProvider;
    private final long lowestGlobalSequence;
    private final int maxGapOffset;
    private final TransactionManager transactionManager;
    private final boolean explicitFlush;
    private int gapTimeout;
    private int gapCleaningThreshold;

    public JpaEventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
        this(null, null, null, null, null, entityManagerProvider, transactionManager, null, null, true);
    }

    public JpaEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, DataSource dataSource, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) throws SQLException {
        this(serializer, eventUpcaster, dataSource, serializer, entityManagerProvider, transactionManager);
    }

    public JpaEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, DataSource dataSource, Serializer serializer2, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) throws SQLException {
        this(serializer, eventUpcaster, new SQLErrorCodesResolver(dataSource), serializer2, entityManagerProvider, transactionManager);
    }

    public JpaEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, PersistenceExceptionResolver persistenceExceptionResolver, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
        this(serializer, eventUpcaster, persistenceExceptionResolver, serializer, entityManagerProvider, transactionManager);
    }

    public JpaEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, PersistenceExceptionResolver persistenceExceptionResolver, Serializer serializer2, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
        this(serializer, eventUpcaster, persistenceExceptionResolver, serializer2, null, entityManagerProvider, transactionManager, null, null, true);
    }

    public JpaEventStorageEngine(Serializer serializer, EventUpcaster eventUpcaster, PersistenceExceptionResolver persistenceExceptionResolver, Serializer serializer2, Integer num, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager, Long l, Integer num2, boolean z) {
        super(serializer, eventUpcaster, persistenceExceptionResolver, serializer2, num);
        this.gapTimeout = DEFAULT_GAP_TIMEOUT;
        this.gapCleaningThreshold = DEFAULT_GAP_CLEANING_THRESHOLD;
        this.entityManagerProvider = entityManagerProvider;
        this.lowestGlobalSequence = ((Long) ObjectUtils.getOrDefault(l, Long.valueOf(DEFAULT_LOWEST_GLOBAL_SEQUENCE))).longValue();
        this.maxGapOffset = ((Integer) ObjectUtils.getOrDefault(num2, Integer.valueOf(DEFAULT_MAX_GAP_OFFSET))).intValue();
        this.transactionManager = transactionManager;
        this.explicitFlush = z;
    }

    @Override // org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine
    protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken trackingToken, int i) {
        Assert.isTrue(trackingToken == null || (trackingToken instanceof GapAwareTrackingToken), () -> {
            return String.format("Token [%s] is of the wrong type. Expected [%s]", trackingToken, GapAwareTrackingToken.class.getSimpleName());
        });
        GapAwareTrackingToken cleanedToken = cleanedToken((GapAwareTrackingToken) trackingToken);
        List<Object[]> list = (List) this.transactionManager.fetchInTransaction(() -> {
            return ((cleanedToken == null || cleanedToken.getGaps().isEmpty()) ? entityManager().createQuery("SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData FROM " + domainEventEntryEntityName() + " e WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC", Object[].class) : entityManager().createQuery("SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData FROM " + domainEventEntryEntityName() + " e WHERE e.globalIndex > :token OR e.globalIndex IN :gaps ORDER BY e.globalIndex ASC", Object[].class).setParameter("gaps", cleanedToken.getGaps())).setParameter("token", Long.valueOf(cleanedToken == null ? -1L : cleanedToken.getIndex())).setMaxResults(i).getResultList();
        });
        ArrayList arrayList = new ArrayList();
        GapAwareTrackingToken gapAwareTrackingToken = cleanedToken;
        for (Object[] objArr : list) {
            long longValue = ((Long) objArr[0]).longValue();
            GenericDomainEventEntry genericDomainEventEntry = new GenericDomainEventEntry((String) objArr[1], (String) objArr[2], ((Long) objArr[3]).longValue(), (String) objArr[4], objArr[5], (String) objArr[6], (String) objArr[7], objArr[8], objArr[9]);
            boolean isAfter = genericDomainEventEntry.getTimestamp().isAfter(gapTimeoutFrame());
            gapAwareTrackingToken = gapAwareTrackingToken == null ? GapAwareTrackingToken.newInstance(longValue, isAfter ? (Collection) LongStream.range(Math.min(this.lowestGlobalSequence, longValue), longValue).boxed().collect(Collectors.toCollection(TreeSet::new)) : Collections.emptySortedSet()) : gapAwareTrackingToken.advanceTo(longValue, this.maxGapOffset, isAfter);
            arrayList.add(new TrackedDomainEventData(gapAwareTrackingToken, genericDomainEventEntry));
        }
        return arrayList;
    }

    private GapAwareTrackingToken cleanedToken(GapAwareTrackingToken gapAwareTrackingToken) {
        GapAwareTrackingToken gapAwareTrackingToken2 = gapAwareTrackingToken;
        if (gapAwareTrackingToken != null && gapAwareTrackingToken.getGaps().size() > this.gapCleaningThreshold) {
            for (Object[] objArr : (List) this.transactionManager.fetchInTransaction(() -> {
                return entityManager().createQuery("SELECT e.globalIndex, e.timeStamp FROM " + domainEventEntryEntityName() + " e WHERE e.globalIndex >= :firstGapOffset AND e.globalIndex <= :maxGlobalIndex", Object[].class).setParameter("firstGapOffset", gapAwareTrackingToken.getGaps().first()).setParameter("maxGlobalIndex", Long.valueOf(gapAwareTrackingToken.getGaps().last().longValue() + DEFAULT_LOWEST_GLOBAL_SEQUENCE)).getResultList();
            })) {
                try {
                    Instant parseInstant = DateTimeUtils.parseInstant(objArr[1].toString());
                    long longValue = ((Long) objArr[0]).longValue();
                    if (gapAwareTrackingToken2.getGaps().contains(Long.valueOf(longValue)) || parseInstant.isAfter(gapTimeoutFrame())) {
                        break;
                    }
                    if (gapAwareTrackingToken2.getGaps().contains(Long.valueOf(longValue - DEFAULT_LOWEST_GLOBAL_SEQUENCE))) {
                        gapAwareTrackingToken2 = gapAwareTrackingToken2.advanceTo(longValue - DEFAULT_LOWEST_GLOBAL_SEQUENCE, this.maxGapOffset, false);
                    }
                } catch (DateTimeParseException e) {
                    logger.info("Unable to parse timestamp to clean old gaps", e);
                }
            }
        }
        return gapAwareTrackingToken2;
    }

    private Instant gapTimeoutFrame() {
        return GenericEventMessage.clock.instant().minus(this.gapTimeout, (TemporalUnit) ChronoUnit.MILLIS);
    }

    @Override // org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine
    protected List<? extends DomainEventData<?>> fetchDomainEvents(String str, long j, int i) {
        return (List) this.transactionManager.fetchInTransaction(() -> {
            return entityManager().createQuery("SELECT new org.axonframework.eventsourcing.eventstore.GenericDomainEventEntry(e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData) FROM " + domainEventEntryEntityName() + " e WHERE e.aggregateIdentifier = :id AND e.sequenceNumber >= :seq ORDER BY e.sequenceNumber ASC").setParameter("id", str).setParameter("seq", Long.valueOf(j)).setMaxResults(i).getResultList();
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
    protected Stream<? extends DomainEventData<?>> readSnapshotData(String str) {
        return (Stream) this.transactionManager.fetchInTransaction(() -> {
            return entityManager().createQuery("SELECT new org.axonframework.eventsourcing.eventstore.GenericDomainEventEntry(e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData) FROM " + snapshotEventEntryEntityName() + " e WHERE e.aggregateIdentifier = :id ORDER BY e.sequenceNumber DESC").setParameter("id", str).setMaxResults(1).getResultList().stream();
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
    protected void appendEvents(List<? extends EventMessage<?>> list, Serializer serializer) {
        if (list.isEmpty()) {
            return;
        }
        try {
            Stream<R> map = list.stream().map(eventMessage -> {
                return createEventEntity(eventMessage, serializer);
            });
            EntityManager entityManager = entityManager();
            entityManager.getClass();
            map.forEach(entityManager::persist);
            if (this.explicitFlush) {
                entityManager().flush();
            }
        } catch (Exception e) {
            handlePersistenceException(e, list.get(0));
        }
    }

    @Override // org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine
    protected void storeSnapshot(DomainEventMessage<?> domainEventMessage, Serializer serializer) {
        try {
            entityManager().merge(createSnapshotEntity(domainEventMessage, serializer));
            deleteSnapshots(domainEventMessage.getAggregateIdentifier(), domainEventMessage.getSequenceNumber());
            if (this.explicitFlush) {
                entityManager().flush();
            }
        } catch (Exception e) {
            handlePersistenceException(e, domainEventMessage);
        }
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public Optional<Long> lastSequenceNumberFor(String str) {
        List resultList = entityManager().createQuery("SELECT MAX(e.sequenceNumber) FROM " + domainEventEntryEntityName() + " e WHERE e.aggregateIdentifier = :aggregateId", Long.class).setParameter("aggregateId", str).getResultList();
        return resultList.size() == 0 ? Optional.empty() : Optional.ofNullable(resultList.get(0));
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public TrackingToken createTailToken() {
        return createToken(entityManager().createQuery("SELECT MIN(e.globalIndex) - 1 FROM " + domainEventEntryEntityName() + " e", Long.class).getResultList());
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public TrackingToken createHeadToken() {
        return createToken(entityManager().createQuery("SELECT MAX(e.globalIndex) FROM " + domainEventEntryEntityName() + " e", Long.class).getResultList());
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public TrackingToken createTokenAt(Instant instant) {
        return createToken(entityManager().createQuery("SELECT MIN(e.globalIndex) - 1 FROM " + domainEventEntryEntityName() + " e WHERE e.timeStamp >= :dateTime", Long.class).setParameter("dateTime", DateTimeUtils.formatInstant(instant)).getResultList());
    }

    private TrackingToken createToken(List<Long> list) {
        if (list.size() == 0 || list.get(0) == null) {
            return null;
        }
        return GapAwareTrackingToken.newInstance(list.get(0).longValue(), Collections.emptySet());
    }

    protected void deleteSnapshots(String str, long j) {
        entityManager().createQuery("DELETE FROM " + snapshotEventEntryEntityName() + " e WHERE e.aggregateIdentifier = :aggregateIdentifier AND e.sequenceNumber < :sequenceNumber").setParameter("aggregateIdentifier", str).setParameter("sequenceNumber", Long.valueOf(j)).executeUpdate();
    }

    protected Object createEventEntity(EventMessage<?> eventMessage, Serializer serializer) {
        return new DomainEventEntry(EventUtils.asDomainEventMessage(eventMessage), serializer);
    }

    protected Object createSnapshotEntity(DomainEventMessage<?> domainEventMessage, Serializer serializer) {
        return new SnapshotEventEntry(domainEventMessage, serializer);
    }

    protected String domainEventEntryEntityName() {
        return DomainEventEntry.class.getSimpleName();
    }

    protected String snapshotEventEntryEntityName() {
        return SnapshotEventEntry.class.getSimpleName();
    }

    protected EntityManager entityManager() {
        return this.entityManagerProvider.getEntityManager();
    }

    public void setGapTimeout(int i) {
        this.gapTimeout = i;
    }

    public void setGapCleaningThreshold(int i) {
        this.gapCleaningThreshold = i;
    }
}
