package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.EventStoreEventBus;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateEventStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.EventStreamGapHandler;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.NoEventStreamGapHandler;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.PostgresqlEventStreamGapHandler;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.gap.SubscriptionGapHandler;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptorChain;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.AppendToStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.FetchStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadEvents;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadEventsByGlobalOrder;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.operations.LoadLastPersistedEventRelatedTo;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.AggregateEventStreamConfiguration;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.AggregateEventStreamPersistenceStrategy;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.serializer.AggregateIdSerializer;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWork;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.GlobalEventOrder;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.components.foundation.types.Tenant;
import dk.cloudcreate.essentials.reactive.EventBus;
import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.interceptor.DefaultInterceptorChain;
import dk.cloudcreate.essentials.types.LongRange;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/PostgresqlEventStore.class */
public final class PostgresqlEventStore<CONFIG extends AggregateEventStreamConfiguration> implements ConfigurableEventStore<CONFIG> {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlEventStore.class);
    private static final SubscriberId NO_SUBSCRIBER_ID = SubscriberId.of("NoSubscriberId");
    private final EventStoreUnitOfWorkFactory<EventStoreUnitOfWork> unitOfWorkFactory;
    private final AggregateEventStreamPersistenceStrategy<CONFIG> persistenceStrategy;
    private final ConcurrentMap<Class<?>, InMemoryProjector> inMemoryProjectorPerProjectionType;
    private final HashSet<InMemoryProjector> inMemoryProjectors;
    private final List<EventStoreInterceptor> eventStoreInterceptors;
    private final EventStoreEventBus eventStoreEventBus;
    private final EventStreamGapHandler<CONFIG> eventStreamGapHandler;

    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/PostgresqlEventStore$PollEventStoreTask.class */
    private class PollEventStoreTask implements Runnable {
        private final long demandForEvents;
        private final FluxSink<PersistedEvent> sink;
        private final AggregateType aggregateType;
        private final Optional<Tenant> onlyIncludeEventIfItBelongsToTenant;
        private final String eventStreamLogName;
        private final Logger eventStoreStreamLog;
        private final Optional<Duration> pollingInterval;
        private final AtomicInteger consecutiveNoPersistedEventsReturned;
        private final long batchFetchSize;
        private final AtomicLong lastBatchSizeForThisQuery;
        private final AtomicLong nextFromInclusiveGlobalOrder;
        private final Optional<SubscriptionGapHandler> subscriptionGapHandler;

        public PollEventStoreTask(long j, FluxSink<PersistedEvent> fluxSink, AggregateType aggregateType, Optional<Tenant> optional, String str, Logger logger, Optional<Duration> optional2, AtomicInteger atomicInteger, long j2, AtomicLong atomicLong, AtomicLong atomicLong2, Optional<SubscriptionGapHandler> optional3) {
            this.demandForEvents = j;
            this.sink = fluxSink;
            this.aggregateType = aggregateType;
            this.onlyIncludeEventIfItBelongsToTenant = optional;
            this.eventStreamLogName = str;
            this.eventStoreStreamLog = logger;
            this.pollingInterval = optional2;
            this.consecutiveNoPersistedEventsReturned = atomicInteger;
            this.batchFetchSize = j2;
            this.lastBatchSizeForThisQuery = atomicLong;
            this.nextFromInclusiveGlobalOrder = atomicLong2;
            this.subscriptionGapHandler = optional3;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.eventStoreStreamLog.debug("[{}] Polling worker - Started with initial demand for events {}", this.eventStreamLogName, Long.valueOf(this.demandForEvents));
            long millis = this.pollingInterval.orElse(Duration.ofMillis(500L)).toMillis();
            long j = this.demandForEvents;
            while (j > 0 && !this.sink.isCancelled()) {
                long pollForEvents = pollForEvents(j);
                j -= pollForEvents;
                this.eventStoreStreamLog.trace("[{}] Polling worker published {} event(s) - Outstanding demand for events {}", new Object[]{this.eventStreamLogName, Long.valueOf(pollForEvents), Long.valueOf(j)});
                try {
                    Thread.sleep(millis);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.eventStoreStreamLog.debug("[{}] Polling worker - Completed with remaining demand for events {}. Is Cancelled: {}", new Object[]{this.eventStreamLogName, Long.valueOf(j), Boolean.valueOf(this.sink.isCancelled())});
        }

        private long pollForEvents(long j) {
            this.eventStoreStreamLog.trace("[{}] Polling worker - Polling for {} events", this.eventStreamLogName, Long.valueOf(j));
            try {
                EventStoreUnitOfWork orCreateNewUnitOfWork = PostgresqlEventStore.this.unitOfWorkFactory.getOrCreateNewUnitOfWork();
                try {
                    long resolveBatchSizeForThisQuery = PostgresqlEventStore.this.resolveBatchSizeForThisQuery(this.aggregateType, this.eventStreamLogName, this.eventStoreStreamLog, this.lastBatchSizeForThisQuery.get(), Math.min(this.batchFetchSize, j), this.consecutiveNoPersistedEventsReturned, this.nextFromInclusiveGlobalOrder, orCreateNewUnitOfWork);
                    if (resolveBatchSizeForThisQuery == 0) {
                        this.consecutiveNoPersistedEventsReturned.set(0);
                        this.lastBatchSizeForThisQuery.set(j);
                        this.eventStoreStreamLog.debug("[{}] Polling worker - Skipping polling as no new events have been persisted since last poll", this.eventStreamLogName);
                        return 0L;
                    }
                    this.lastBatchSizeForThisQuery.set(resolveBatchSizeForThisQuery);
                    this.eventStoreStreamLog.trace("[{}] Polling worker - Using batchSizeForThisQuery: {}", this.eventStreamLogName, Long.valueOf(resolveBatchSizeForThisQuery));
                    LongRange from = LongRange.from(this.nextFromInclusiveGlobalOrder.get(), resolveBatchSizeForThisQuery);
                    List<GlobalEventOrder> list = (List) this.subscriptionGapHandler.map(subscriptionGapHandler -> {
                        return subscriptionGapHandler.findTransientGapsToIncludeInQuery(this.aggregateType, from);
                    }).orElse(null);
                    List list2 = (List) PostgresqlEventStore.this.loadEventsByGlobalOrder(this.aggregateType, from, list, this.onlyIncludeEventIfItBelongsToTenant).collect(Collectors.toList());
                    this.subscriptionGapHandler.ifPresent(subscriptionGapHandler2 -> {
                        subscriptionGapHandler2.reconcileGaps(this.aggregateType, from, list2, list);
                    });
                    orCreateNewUnitOfWork.commit();
                    if (list2.size() <= 0) {
                        this.consecutiveNoPersistedEventsReturned.incrementAndGet();
                        this.eventStoreStreamLog.trace("[{}] Polling worker - loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned no events", new Object[]{this.eventStreamLogName, from, list});
                        return 0L;
                    }
                    this.consecutiveNoPersistedEventsReturned.set(0);
                    if (PostgresqlEventStore.log.isTraceEnabled()) {
                        this.eventStoreStreamLog.debug("[{}] Polling worker - loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned {} events: {}", new Object[]{this.eventStreamLogName, from, list, Integer.valueOf(list2.size()), list2.stream().map((v0) -> {
                            return v0.globalEventOrder();
                        }).collect(Collectors.toList())});
                    } else {
                        this.eventStoreStreamLog.debug("[{}] Polling worker - loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned {} events", new Object[]{this.eventStreamLogName, from, list, Integer.valueOf(list2.size())});
                    }
                    List list3 = list2;
                    if (list2.size() > j) {
                        this.eventStoreStreamLog.debug("[{}] Polling worker - Found {} event(s) to publish, but will only publish {} of the found event(s) as this matches with the remainingDemandForEvents", new Object[]{this.eventStreamLogName, Integer.valueOf(list2.size()), Long.valueOf(j)});
                        list3 = list2.subList(0, (int) j);
                    }
                    for (int i = 0; i < list3.size(); i++) {
                        if (this.sink.isCancelled()) {
                            this.eventStoreStreamLog.debug("[{}] Polling worker - Is Cancelled: true. Skipping publishing further events (has only published {} out of the planned {} events)", new Object[]{this.eventStreamLogName, Integer.valueOf(i + 1), Integer.valueOf(list3.size())});
                            return i;
                        }
                        publishEventToSink((PersistedEvent) list3.get(i));
                    }
                    return list3.size();
                } catch (RuntimeException e) {
                    PostgresqlEventStore.log.error(MessageFormatter.msg("[{}] Polling worker - Polling failed", new Object[]{this.eventStreamLogName}), e);
                    if (orCreateNewUnitOfWork != null) {
                        try {
                            this.eventStoreStreamLog.debug("[{}] Polling worker - rolling back UnitOfWork due to error during polling", this.eventStreamLogName);
                            orCreateNewUnitOfWork.rollback(e);
                        } catch (Exception e2) {
                            PostgresqlEventStore.log.error(MessageFormatter.msg("[{}] Polling worker - Failed to rollback unit of work", new Object[]{this.eventStreamLogName}), e2);
                        }
                    }
                    this.eventStoreStreamLog.error(MessageFormatter.msg("[{}] Polling worker - Returning Error for '{}' EventStream with nextFromInclusiveGlobalOrder {}", new Object[]{this.eventStreamLogName, this.aggregateType, Long.valueOf(this.nextFromInclusiveGlobalOrder.get())}), e);
                    return 0L;
                }
            } catch (Exception e3) {
                Throwable rootCause = Exceptions.getRootCause(e3);
                if (e3.getMessage().contains("has been closed") || (rootCause instanceof IOException) || (rootCause instanceof ConnectionException) || (rootCause instanceof UnableToExecuteStatementException)) {
                    this.eventStoreStreamLog.debug(MessageFormatter.msg("[{}] Polling worker - Experienced a Postgresql Connection issue while creating a UnitOfWork", new Object[]{this.eventStreamLogName}), e3);
                    return 0L;
                }
                PostgresqlEventStore.log.error(MessageFormatter.msg("[{}] Polling worker - Experienced an error while creating a UnitOfWork", new Object[]{this.eventStreamLogName}), e3);
                this.sink.error(e3);
                return 0L;
            }
        }

        private void publishEventToSink(PersistedEvent persistedEvent) {
            this.eventStoreStreamLog.trace("[{}] Polling worker - Publishing '{}' Event '{}' with globalOrder {} to Flux", new Object[]{this.eventStreamLogName, persistedEvent.aggregateType(), persistedEvent.event().getEventTypeOrNamePersistenceValue(), persistedEvent.globalEventOrder()});
            this.sink.next(persistedEvent);
            long longValue = persistedEvent.globalEventOrder().longValue() + 1;
            this.eventStoreStreamLog.trace("[{}] Polling worker - Updating nextFromInclusiveGlobalOrder from {} to {}", new Object[]{this.eventStreamLogName, Long.valueOf(this.nextFromInclusiveGlobalOrder.get()), Long.valueOf(longValue)});
            this.nextFromInclusiveGlobalOrder.set(longValue);
        }
    }

    public <STRATEGY extends AggregateEventStreamPersistenceStrategy<CONFIG>> PostgresqlEventStore(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory, STRATEGY strategy) {
        this(eventStoreUnitOfWorkFactory, strategy, Optional.empty(), postgresqlEventStore -> {
            return new NoEventStreamGapHandler();
        });
    }

    public <STRATEGY extends AggregateEventStreamPersistenceStrategy<CONFIG>> PostgresqlEventStore(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory, STRATEGY strategy, Optional<EventStoreEventBus> optional, Function<PostgresqlEventStore<CONFIG>, EventStreamGapHandler<CONFIG>> function) {
        this.unitOfWorkFactory = (EventStoreUnitOfWorkFactory) FailFast.requireNonNull(eventStoreUnitOfWorkFactory, "No unitOfWorkFactory provided");
        this.persistenceStrategy = (AggregateEventStreamPersistenceStrategy) FailFast.requireNonNull(strategy, "No eventStreamPersistenceStrategy provided");
        FailFast.requireNonNull(optional, "No eventStoreLocalEventBus option provided");
        FailFast.requireNonNull(function, "No eventStreamGapHandlerFactory provided");
        this.eventStoreEventBus = optional.orElseGet(() -> {
            return new EventStoreEventBus(eventStoreUnitOfWorkFactory);
        });
        this.eventStreamGapHandler = function.apply(this);
        this.eventStoreInterceptors = new ArrayList();
        this.inMemoryProjectors = new HashSet<>();
        this.inMemoryProjectorPerProjectionType = new ConcurrentHashMap();
    }

    public static <CONFIG extends AggregateEventStreamConfiguration, STRATEGY extends AggregateEventStreamPersistenceStrategy<CONFIG>> PostgresqlEventStore withoutGapHandling(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory, STRATEGY strategy) {
        return new PostgresqlEventStore(eventStoreUnitOfWorkFactory, strategy, Optional.empty(), postgresqlEventStore -> {
            return new NoEventStreamGapHandler();
        });
    }

    public static <CONFIG extends AggregateEventStreamConfiguration, STRATEGY extends AggregateEventStreamPersistenceStrategy<CONFIG>> PostgresqlEventStore withGapHandling(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory, STRATEGY strategy) {
        return new PostgresqlEventStore(eventStoreUnitOfWorkFactory, strategy, Optional.empty(), postgresqlEventStore -> {
            return new PostgresqlEventStreamGapHandler(postgresqlEventStore, eventStoreUnitOfWorkFactory);
        });
    }

    public AggregateEventStreamPersistenceStrategy<CONFIG> getPersistenceStrategy() {
        return this.persistenceStrategy;
    }

    public EventStreamGapHandler<CONFIG> getEventStreamGapHandler() {
        return this.eventStreamGapHandler;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventBus localEventBus() {
        return this.eventStoreEventBus;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addGenericInMemoryProjector(InMemoryProjector inMemoryProjector) {
        this.inMemoryProjectors.add((InMemoryProjector) FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjection"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> removeGenericInMemoryProjector(InMemoryProjector inMemoryProjector) {
        this.inMemoryProjectors.remove(FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjection"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addSpecificInMemoryProjector(Class<?> cls, InMemoryProjector inMemoryProjector) {
        this.inMemoryProjectorPerProjectionType.put((Class) FailFast.requireNonNull(cls, "No projectionType provided"), (InMemoryProjector) FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjection"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> removeSpecificInMemoryProjector(Class<?> cls) {
        this.inMemoryProjectorPerProjectionType.remove(FailFast.requireNonNull(cls, "No projectionType provided"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addEventStoreInterceptor(EventStoreInterceptor eventStoreInterceptor) {
        this.eventStoreInterceptors.add((EventStoreInterceptor) FailFast.requireNonNull(eventStoreInterceptor, "No eventStoreInterceptor provided"));
        DefaultInterceptorChain.sortInterceptorsByOrder(this.eventStoreInterceptors);
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> removeEventStoreInterceptor(EventStoreInterceptor eventStoreInterceptor) {
        this.eventStoreInterceptors.remove(FailFast.requireNonNull(eventStoreInterceptor, "No eventStoreInterceptor provided"));
        DefaultInterceptorChain.sortInterceptorsByOrder(this.eventStoreInterceptors);
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public List<EventStoreInterceptor> getEventStoreInterceptors() {
        return Collections.unmodifiableList(this.eventStoreInterceptors);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> AggregateEventStream<ID> appendToStream(AppendToStream<ID> appendToStream) {
        FailFast.requireNonNull(appendToStream, "You must supply an AppendToStream operation instance");
        EventStoreUnitOfWork requiredUnitOfWork = this.unitOfWorkFactory.getRequiredUnitOfWork();
        return (AggregateEventStream) EventStoreInterceptorChain.newInterceptorChainForOperation(appendToStream, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(appendToStream, eventStoreInterceptorChain);
        }, () -> {
            AggregateEventStream<STREAM_ID> persist = this.persistenceStrategy.persist(requiredUnitOfWork, appendToStream.aggregateType, appendToStream.aggregateId, appendToStream.getAppendEventsAfterEventOrder(), appendToStream.getEventsToAppend());
            requiredUnitOfWork.registerEventsPersisted(persist.eventList());
            return persist;
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> Optional<PersistedEvent> loadLastPersistedEventRelatedTo(LoadLastPersistedEventRelatedTo<ID> loadLastPersistedEventRelatedTo) {
        FailFast.requireNonNull(loadLastPersistedEventRelatedTo, "You must supply an LoadLastPersistedEventRelatedTo operation instance");
        return (Optional) EventStoreInterceptorChain.newInterceptorChainForOperation(loadLastPersistedEventRelatedTo, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadLastPersistedEventRelatedTo, eventStoreInterceptorChain);
        }, () -> {
            return this.persistenceStrategy.loadLastPersistedEventRelatedTo(this.unitOfWorkFactory.getRequiredUnitOfWork(), loadLastPersistedEventRelatedTo.aggregateType, loadLastPersistedEventRelatedTo.aggregateId);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Optional<PersistedEvent> loadEvent(LoadEvent loadEvent) {
        FailFast.requireNonNull(loadEvent, "You must supply an LoadEvent operation instance");
        return (Optional) EventStoreInterceptorChain.newInterceptorChainForOperation(loadEvent, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadEvent, (EventStoreInterceptorChain<LoadEvent, Optional<PersistedEvent>>) eventStoreInterceptorChain);
        }, () -> {
            return this.persistenceStrategy.loadEvent(this.unitOfWorkFactory.getRequiredUnitOfWork(), loadEvent.aggregateType, loadEvent.eventId);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public List<PersistedEvent> loadEvents(LoadEvents loadEvents) {
        FailFast.requireNonNull(loadEvents, "You must supply an LoadEvents operation instance");
        return (List) EventStoreInterceptorChain.newInterceptorChainForOperation(loadEvents, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadEvents, (EventStoreInterceptorChain<LoadEvents, List<PersistedEvent>>) eventStoreInterceptorChain);
        }, () -> {
            return this.persistenceStrategy.loadEvents(this.unitOfWorkFactory.getRequiredUnitOfWork(), loadEvents.aggregateType, loadEvents.eventIds);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> Optional<AggregateEventStream<ID>> fetchStream(FetchStream<ID> fetchStream) {
        FailFast.requireNonNull(fetchStream, "You must supply an LoadEvent operation instance");
        return (Optional) EventStoreInterceptorChain.newInterceptorChainForOperation(fetchStream, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(fetchStream, eventStoreInterceptorChain);
        }, () -> {
            return this.persistenceStrategy.loadAggregateEvents(this.unitOfWorkFactory.getRequiredUnitOfWork(), fetchStream.aggregateType, fetchStream.aggregateId, fetchStream.getEventOrderRange(), fetchStream.getTenant());
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Optional<GlobalEventOrder> findHighestGlobalEventOrderPersisted(AggregateType aggregateType) {
        return this.persistenceStrategy.findHighestGlobalEventOrderPersisted(this.unitOfWorkFactory.getRequiredUnitOfWork(), aggregateType);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID, AGGREGATE> Optional<AGGREGATE> inMemoryProjection(AggregateType aggregateType, ID id, Class<AGGREGATE> cls) {
        FailFast.requireNonNull(cls, "No projectionType provided");
        return inMemoryProjection(aggregateType, id, cls, this.inMemoryProjectorPerProjectionType.computeIfAbsent(cls, cls2 -> {
            return (InMemoryProjector) this.inMemoryProjectors.stream().filter(inMemoryProjector -> {
                return inMemoryProjector.supports(cls);
            }).findFirst().orElseThrow(() -> {
                return new EventStoreException(MessageFormatter.msg("Couldn't find an {} that supports projection-type '{}'", new Object[]{InMemoryProjector.class.getSimpleName(), cls.getName()}));
            });
        }));
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID, AGGREGATE> Optional<AGGREGATE> inMemoryProjection(AggregateType aggregateType, ID id, Class<AGGREGATE> cls, InMemoryProjector inMemoryProjector) {
        FailFast.requireNonNull(aggregateType, "No aggregateType provided");
        FailFast.requireNonNull(id, "No aggregateId provided");
        FailFast.requireNonNull(cls, "No projectionType provided");
        FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjector provided");
        if (inMemoryProjector.supports(cls)) {
            return inMemoryProjector.projectEvents(aggregateType, id, cls, this);
        }
        throw new IllegalArgumentException(MessageFormatter.msg("The provided {} '{}' does not support projection type '{}'", new Object[]{InMemoryProjector.class.getName(), inMemoryProjector.getClass().getName(), cls.getName()}));
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Stream<PersistedEvent> loadEventsByGlobalOrder(LoadEventsByGlobalOrder loadEventsByGlobalOrder) {
        FailFast.requireNonNull(loadEventsByGlobalOrder, "You must supply an LoadEventsByGlobalOrder operation instance");
        return (Stream) EventStoreInterceptorChain.newInterceptorChainForOperation(loadEventsByGlobalOrder, this, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadEventsByGlobalOrder, (EventStoreInterceptorChain<LoadEventsByGlobalOrder, Stream<PersistedEvent>>) eventStoreInterceptorChain);
        }, () -> {
            return this.persistenceStrategy.loadEventsByGlobalOrder(this.unitOfWorkFactory.getRequiredUnitOfWork(), loadEventsByGlobalOrder.aggregateType, loadEventsByGlobalOrder.getGlobalEventOrderRange(), loadEventsByGlobalOrder.getIncludeAdditionalGlobalOrders(), loadEventsByGlobalOrder.getOnlyIncludeEventIfItBelongsToTenant());
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Flux<PersistedEvent> pollEvents(AggregateType aggregateType, long j, Optional<Integer> optional, Optional<Duration> optional2, Optional<Tenant> optional3, Optional<SubscriberId> optional4) {
        FailFast.requireNonNull(aggregateType, "You must supply an aggregateType");
        FailFast.requireNonNull(optional2, "You must supply a pollingInterval option");
        FailFast.requireNonNull(optional3, "You must supply a onlyIncludeEventIfItBelongsToTenant option");
        FailFast.requireNonNull(optional4, "You must supply a subscriberId option");
        String str = "EventStream:" + aggregateType + ":" + optional4.orElseGet(SubscriberId::random);
        Logger logger = LoggerFactory.getLogger(EventStore.class.getName() + ".PollingEventStream");
        long intValue = optional.orElse(100).intValue();
        logger.debug("[{}] Creating polling reactive '{}' EventStream with fromInclusiveGlobalOrder {} and batch size {}", new Object[]{str, aggregateType, Long.valueOf(j), Long.valueOf(intValue)});
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicLong atomicLong = new AtomicLong(intValue);
        AtomicLong atomicLong2 = new AtomicLong(j);
        EventStreamGapHandler<CONFIG> eventStreamGapHandler = this.eventStreamGapHandler;
        Objects.requireNonNull(eventStreamGapHandler);
        Optional<U> map = optional4.map(eventStreamGapHandler::gapHandlerFor);
        return Flux.create(fluxSink -> {
            Scheduler newSingle = Schedulers.newSingle("Publish-" + optional4.orElse(NO_SUBSCRIBER_ID) + "-" + aggregateType, true);
            fluxSink.onRequest(j2 -> {
                logger.debug("[{}] Received demand for {} events", str, Long.valueOf(j2));
                newSingle.schedule(new PollEventStoreTask(j2, fluxSink, aggregateType, optional3, str, logger, optional2, atomicInteger, intValue, atomicLong, atomicLong2, map));
            });
            fluxSink.onCancel(newSingle);
        }, FluxSink.OverflowStrategy.ERROR);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Flux<PersistedEvent> unboundedPollForEvents(AggregateType aggregateType, long j, Optional<Integer> optional, Optional<Duration> optional2, Optional<Tenant> optional3, Optional<SubscriberId> optional4) {
        FailFast.requireNonNull(aggregateType, "You must supply an aggregateType");
        FailFast.requireNonNull(optional2, "You must supply a pollingInterval option");
        FailFast.requireNonNull(optional3, "You must supply a onlyIncludeEventIfItBelongsToTenant option");
        FailFast.requireNonNull(optional4, "You must supply a subscriberId option");
        String str = "EventStream:" + aggregateType + ":" + optional4.orElseGet(SubscriberId::random);
        Logger logger = LoggerFactory.getLogger(EventStore.class.getName() + ".PollingEventStream");
        long intValue = optional.orElse(100).intValue();
        logger.debug("[{}] Creating polling reactive '{}' EventStream with fromInclusiveGlobalOrder {} and batch size {}", new Object[]{str, aggregateType, Long.valueOf(j), Long.valueOf(intValue)});
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicLong atomicLong = new AtomicLong(intValue);
        AtomicLong atomicLong2 = new AtomicLong(j);
        EventStreamGapHandler<CONFIG> eventStreamGapHandler = this.eventStreamGapHandler;
        Objects.requireNonNull(eventStreamGapHandler);
        Optional<U> map = optional4.map(eventStreamGapHandler::gapHandlerFor);
        return Flux.defer(() -> {
            try {
                EventStoreUnitOfWork orCreateNewUnitOfWork = this.unitOfWorkFactory.getOrCreateNewUnitOfWork();
                try {
                    long resolveBatchSizeForThisQuery = resolveBatchSizeForThisQuery(aggregateType, str, logger, atomicLong.get(), intValue, atomicInteger, atomicLong2, orCreateNewUnitOfWork);
                    if (resolveBatchSizeForThisQuery == 0) {
                        atomicInteger.set(0);
                        atomicLong.set(intValue);
                        logger.debug("[{}] Skipping polling as no new events have been persisted since last poll", str);
                        return Flux.empty();
                    }
                    atomicLong.set(resolveBatchSizeForThisQuery);
                    LongRange from = LongRange.from(atomicLong2.get(), resolveBatchSizeForThisQuery);
                    List<GlobalEventOrder> list = (List) map.map(subscriptionGapHandler -> {
                        return subscriptionGapHandler.findTransientGapsToIncludeInQuery(aggregateType, from);
                    }).orElse(null);
                    List list2 = (List) loadEventsByGlobalOrder(aggregateType, from, list, (Optional<Tenant>) optional3).collect(Collectors.toList());
                    map.ifPresent(subscriptionGapHandler2 -> {
                        subscriptionGapHandler2.reconcileGaps(aggregateType, from, list2, list);
                    });
                    orCreateNewUnitOfWork.commit();
                    if (list2.size() > 0) {
                        atomicInteger.set(0);
                        if (log.isTraceEnabled()) {
                            logger.debug("[{}] loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned {} events: {}", new Object[]{str, from, list, Integer.valueOf(list2.size()), list2.stream().map((v0) -> {
                                return v0.globalEventOrder();
                            }).collect(Collectors.toList())});
                        } else {
                            logger.debug("[{}] loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned {} events", new Object[]{str, from, list, Integer.valueOf(list2.size())});
                        }
                    } else {
                        atomicInteger.incrementAndGet();
                        logger.trace("[{}] loadEventsByGlobalOrder using globalOrderRange {} and transientGapsToIncludeInQuery {} returned no events", new Object[]{str, from, list});
                    }
                    return Flux.fromIterable(list2);
                } catch (RuntimeException e) {
                    log.error(MessageFormatter.msg("[{}] Polling failed", new Object[]{str}), e);
                    if (orCreateNewUnitOfWork != null) {
                        try {
                            orCreateNewUnitOfWork.rollback(e);
                        } catch (Exception e2) {
                            log.error(MessageFormatter.msg("[{}] Failed to rollback unit of work", new Object[]{str}), e2);
                        }
                    }
                    logger.error(MessageFormatter.msg("[{}] Returning Error for '{}' EventStream with nextFromInclusiveGlobalOrder {}", new Object[]{str, aggregateType, Long.valueOf(atomicLong2.get())}), e);
                    return Flux.error(e);
                }
            } catch (ConnectionException e3) {
                logger.debug(MessageFormatter.msg("[{}] Experienced a Postgresql Connection issue, will return an empty Flux", new Object[]{str}), e3);
                return Flux.empty();
            }
        }).doOnNext(persistedEvent -> {
            long longValue = persistedEvent.globalEventOrder().longValue() + 1;
            logger.trace("[{}] Updating nextFromInclusiveGlobalOrder from {} to {}", new Object[]{str, Long.valueOf(atomicLong2.get()), Long.valueOf(longValue)});
            atomicLong2.set(longValue);
        }).doOnError(th -> {
            logger.error(MessageFormatter.msg("[{}] Failed: {}", new Object[]{str, th.getMessage()}), th);
        }).repeatWhen(flux -> {
            return Flux.interval((Duration) optional2.orElse(Duration.ofMillis(500L))).onBackpressureDrop().publishOn(Schedulers.newSingle("Publish-" + optional4.orElse(NO_SUBSCRIBER_ID) + "-" + aggregateType, true));
        });
    }

    protected long resolveBatchSizeForThisQuery(AggregateType aggregateType, String str, Logger logger, long j, long j2, AtomicInteger atomicInteger, AtomicLong atomicLong, EventStoreUnitOfWork eventStoreUnitOfWork) {
        long j3 = j;
        int i = atomicInteger.get();
        if (i > 0 && i % 100 == 0) {
            Optional<GlobalEventOrder> findHighestGlobalEventOrderPersisted = this.persistenceStrategy.findHighestGlobalEventOrderPersisted(eventStoreUnitOfWork, aggregateType);
            if (findHighestGlobalEventOrderPersisted.isPresent()) {
                if (findHighestGlobalEventOrderPersisted.get().longValue() == atomicLong.get() - 1) {
                    logger.debug("[{}] loadEventsByGlobalOrder RESETTING query batchSize back to default {} since highestPersistedGlobalEventOrder {} is the same as nextFromInclusiveGlobalOrder {} - 1", new Object[]{str, Long.valueOf(j2), findHighestGlobalEventOrderPersisted.get(), Long.valueOf(atomicLong.get())});
                    j3 = 0;
                } else {
                    j3 = ((float) j3) + (((float) (j2 * (i / 100))) * 1.0f);
                    if (j3 > j2) {
                        logger.debug("[{}] loadEventsByGlobalOrder temporarily INCREASED query batchSize to {} from {} instead of default {} since number of consecutiveNoPersistedEventsReturned was {}", new Object[]{str, Long.valueOf(j3), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
                    }
                }
            }
        } else if (i > 0 && i % 10 == 0) {
            j3 = ((float) j3) + (((float) (j2 * (i / 10))) * 0.5f);
            if (j3 > j2) {
                logger.debug("[{}] loadEventsByGlobalOrder temporarily INCREASED query batchSize to {} from {} instead of default {} since number of consecutiveNoPersistedEventsReturned was {}", new Object[]{str, Long.valueOf(j3), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
            }
        } else if (i == 0 && j3 != j2) {
            logger.debug("[{}] loadEventsByGlobalOrder RESETTING query batchSize back to default {} from {} as new events have been received", new Object[]{str, Long.valueOf(j2), Long.valueOf(j3)});
            j3 = j2;
        }
        return j3;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventStoreUnitOfWorkFactory<EventStoreUnitOfWork> getUnitOfWorkFactory() {
        return this.unitOfWorkFactory;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addAggregateEventStreamConfiguration(CONFIG config) {
        this.persistenceStrategy.addAggregateEventStreamConfiguration(config);
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addAggregateEventStreamConfiguration(AggregateType aggregateType, AggregateIdSerializer aggregateIdSerializer) {
        this.persistenceStrategy.addAggregateEventStreamConfiguration(aggregateType, aggregateIdSerializer);
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public ConfigurableEventStore<CONFIG> addAggregateEventStreamConfiguration(AggregateType aggregateType, Class<?> cls) {
        this.persistenceStrategy.addAggregateEventStreamConfiguration(aggregateType, cls);
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public Optional<CONFIG> findAggregateEventStreamConfiguration(AggregateType aggregateType) {
        return this.persistenceStrategy.findAggregateEventStreamConfiguration(aggregateType);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore
    public CONFIG getAggregateEventStreamConfiguration(AggregateType aggregateType) {
        return this.persistenceStrategy.getAggregateEventStreamConfiguration(aggregateType);
    }
}
