package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql;

import dk.cloudcreate.essentials.components.common.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.common.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.common.types.EventId;
import dk.cloudcreate.essentials.components.common.types.SubscriberId;
import dk.cloudcreate.essentials.components.common.types.Tenant;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.EventStoreLocalEventBus;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.PersistedEvents;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateEventStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptor;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.interceptor.EventStoreInterceptorChain;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.AggregateEventStreamPersistenceStrategy;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.AggregateTypeConfiguration;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWork;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWorkFactory;
import dk.cloudcreate.essentials.reactive.LocalEventBus;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.types.LongRange;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jdbi.v3.core.ConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/PostgresqlEventStore.class */
public class PostgresqlEventStore<CONFIG extends AggregateTypeConfiguration> implements EventStore<CONFIG> {
    private static final Logger log = LoggerFactory.getLogger(PostgresqlEventStore.class);
    private final EventStoreUnitOfWorkFactory unitOfWorkFactory;
    private final AggregateEventStreamPersistenceStrategy<CONFIG> aggregateTypeConfigurations;
    private final EventStoreLocalEventBus eventStoreLocalEventBus;
    private final List<EventStoreInterceptor> eventStoreInterceptors = new ArrayList();
    private final HashSet<InMemoryProjector> inMemoryProjectors = new HashSet<>();
    private final ConcurrentMap<Class<?>, InMemoryProjector> inMemoryProjectorPerProjectionType = new ConcurrentHashMap();

    public <STRATEGY extends AggregateEventStreamPersistenceStrategy<CONFIG>> PostgresqlEventStore(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory, STRATEGY strategy) {
        this.unitOfWorkFactory = (EventStoreUnitOfWorkFactory) FailFast.requireNonNull(eventStoreUnitOfWorkFactory, "No unitOfWorkFactory provided");
        this.aggregateTypeConfigurations = (AggregateEventStreamPersistenceStrategy) FailFast.requireNonNull(strategy, "No eventStreamPersistenceStrategy provided");
        this.eventStoreLocalEventBus = new EventStoreLocalEventBus(eventStoreUnitOfWorkFactory);
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public LocalEventBus<PersistedEvents> localEventBus() {
        return this.eventStoreLocalEventBus.localEventBus();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventStore<CONFIG> addGenericInMemoryProjector(InMemoryProjector inMemoryProjector) {
        this.inMemoryProjectors.add((InMemoryProjector) FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjection"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventStore<CONFIG> addSpecificInMemoryProjector(Class<?> cls, InMemoryProjector inMemoryProjector) {
        this.inMemoryProjectorPerProjectionType.put((Class) FailFast.requireNonNull(cls, "No projectionType provided"), (InMemoryProjector) FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjection"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventStore<CONFIG> addEventStoreInterceptor(EventStoreInterceptor eventStoreInterceptor) {
        this.eventStoreInterceptors.add((EventStoreInterceptor) FailFast.requireNonNull(eventStoreInterceptor, "No eventStoreInterceptor provided"));
        return this;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> AggregateEventStream<ID> appendToStream(AggregateType aggregateType, ID id, Optional<Long> optional, List<?> list) {
        EventStoreUnitOfWork requiredUnitOfWork = this.unitOfWorkFactory.getRequiredUnitOfWork();
        EventStoreInterceptor.AppendToStream appendToStream = new EventStoreInterceptor.AppendToStream(aggregateType, id, optional, list);
        AggregateEventStream<ID> aggregateEventStream = (AggregateEventStream) EventStoreInterceptorChain.newChainForOperation(appendToStream, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(appendToStream, eventStoreInterceptorChain);
        }, () -> {
            return this.aggregateTypeConfigurations.persist(requiredUnitOfWork, aggregateType, id, optional, list);
        }).proceed();
        if (requiredUnitOfWork instanceof EventStoreUnitOfWork) {
            requiredUnitOfWork.registerEventsPersisted(aggregateEventStream.eventList());
        }
        return aggregateEventStream;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> Optional<PersistedEvent> loadLastPersistedEventRelatedTo(AggregateType aggregateType, ID id) {
        EventStoreInterceptor.LoadLastPersistedEventRelatedTo loadLastPersistedEventRelatedTo = new EventStoreInterceptor.LoadLastPersistedEventRelatedTo(aggregateType, id);
        return (Optional) EventStoreInterceptorChain.newChainForOperation(loadLastPersistedEventRelatedTo, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadLastPersistedEventRelatedTo, eventStoreInterceptorChain);
        }, () -> {
            return this.aggregateTypeConfigurations.loadLastPersistedEventRelatedTo(this.unitOfWorkFactory.getRequiredUnitOfWork(), aggregateType, id);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Optional<PersistedEvent> loadEvent(AggregateType aggregateType, EventId eventId) {
        EventStoreInterceptor.LoadEvent loadEvent = new EventStoreInterceptor.LoadEvent(aggregateType, eventId);
        return (Optional) EventStoreInterceptorChain.newChainForOperation(loadEvent, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadEvent, (EventStoreInterceptorChain<EventStoreInterceptor.LoadEvent, Optional<PersistedEvent>>) eventStoreInterceptorChain);
        }, () -> {
            return this.aggregateTypeConfigurations.loadEvent(this.unitOfWorkFactory.getRequiredUnitOfWork(), aggregateType, eventId);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID> Optional<AggregateEventStream<ID>> fetchStream(AggregateType aggregateType, ID id, LongRange longRange, Optional<Tenant> optional) {
        EventStoreInterceptor.FetchStream fetchStream = new EventStoreInterceptor.FetchStream(aggregateType, id, longRange, optional);
        return (Optional) EventStoreInterceptorChain.newChainForOperation(fetchStream, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(fetchStream, eventStoreInterceptorChain);
        }, () -> {
            return this.aggregateTypeConfigurations.loadAggregateEvents(this.unitOfWorkFactory.getRequiredUnitOfWork(), aggregateType, id, longRange, optional);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID, AGGREGATE> Optional<AGGREGATE> inMemoryProjection(AggregateType aggregateType, ID id, Class<AGGREGATE> cls) {
        FailFast.requireNonNull(cls, "No projectionType provided");
        return inMemoryProjection(aggregateType, id, cls, this.inMemoryProjectorPerProjectionType.computeIfAbsent(cls, cls2 -> {
            return (InMemoryProjector) this.inMemoryProjectors.stream().filter(inMemoryProjector -> {
                return inMemoryProjector.supports(cls);
            }).findFirst().orElseThrow(() -> {
                return new EventStoreException(MessageFormatter.msg("Couldn't find an {} that supports projection-type '{}'", new Object[]{InMemoryProjector.class.getSimpleName(), cls.getName()}));
            });
        }));
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public <ID, AGGREGATE> Optional<AGGREGATE> inMemoryProjection(AggregateType aggregateType, ID id, Class<AGGREGATE> cls, InMemoryProjector inMemoryProjector) {
        FailFast.requireNonNull(aggregateType, "No aggregateType provided");
        FailFast.requireNonNull(id, "No aggregateId provided");
        FailFast.requireNonNull(cls, "No projectionType provided");
        FailFast.requireNonNull(inMemoryProjector, "No inMemoryProjector provided");
        if (inMemoryProjector.supports(cls)) {
            return inMemoryProjector.projectEvents(aggregateType, id, cls, this);
        }
        throw new IllegalArgumentException(MessageFormatter.msg("The provided {} '{}' does not support projection type '{}'", new Object[]{InMemoryProjector.class.getName(), inMemoryProjector.getClass().getName(), cls.getName()}));
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Stream<PersistedEvent> loadEventsByGlobalOrder(AggregateType aggregateType, LongRange longRange, Optional<Tenant> optional) {
        FailFast.requireNonNull(aggregateType, "No aggregateType provided");
        FailFast.requireNonNull(longRange, "You must specify a globalOrderRange");
        FailFast.requireNonNull(optional, "You must specify an onlyIncludeEventIfItBelongsToTenant option");
        EventStoreInterceptor.LoadEventsByGlobalOrder loadEventsByGlobalOrder = new EventStoreInterceptor.LoadEventsByGlobalOrder(aggregateType, longRange, optional);
        return (Stream) EventStoreInterceptorChain.newChainForOperation(loadEventsByGlobalOrder, this.eventStoreInterceptors, (eventStoreInterceptor, eventStoreInterceptorChain) -> {
            return eventStoreInterceptor.intercept(loadEventsByGlobalOrder, (EventStoreInterceptorChain<EventStoreInterceptor.LoadEventsByGlobalOrder, Stream<PersistedEvent>>) eventStoreInterceptorChain);
        }, () -> {
            return this.aggregateTypeConfigurations.loadEventsByGlobalOrder(this.unitOfWorkFactory.getRequiredUnitOfWork(), aggregateType, longRange, optional);
        }).proceed();
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public Flux<PersistedEvent> pollEvents(AggregateType aggregateType, long j, Optional<Integer> optional, Optional<Duration> optional2, Optional<Tenant> optional3, Optional<SubscriberId> optional4) {
        FailFast.requireNonNull(aggregateType, "You must supply an aggregateType");
        FailFast.requireNonNull(optional2, "You must supply a pollingInterval option");
        FailFast.requireNonNull(optional3, "You must supply a onlyIncludeEventIfItBelongsToTenant option");
        FailFast.requireNonNull(optional4, "You must supply a subscriberId option");
        String str = "EventStream:" + aggregateType + ":" + optional4.orElseGet(SubscriberId::random);
        Logger logger = LoggerFactory.getLogger(EventStore.class.getName() + ".PollingEventStream");
        long intValue = optional.orElse(100).intValue();
        logger.debug("[{}] Creating polling reactive '{}' EventStream with fromInclusiveGlobalOrder {} and batch size {}", new Object[]{str, aggregateType, Long.valueOf(j), Long.valueOf(intValue)});
        AtomicLong atomicLong = new AtomicLong(j);
        return Flux.defer(() -> {
            try {
                UnitOfWork orCreateNewUnitOfWork = this.unitOfWorkFactory.getOrCreateNewUnitOfWork();
                try {
                    List list = (List) loadEventsByGlobalOrder(aggregateType, LongRange.from(atomicLong.get(), intValue), optional3).collect(Collectors.toList());
                    orCreateNewUnitOfWork.rollback();
                    if (list.size() > 0) {
                        logger.debug("[{}] loadEventsByGlobalOrder using fromInclusiveGlobalOrder {} returned {} events", new Object[]{str, Long.valueOf(atomicLong.get()), Integer.valueOf(list.size())});
                    } else {
                        logger.trace("[{}] loadEventsByGlobalOrder using fromInclusiveGlobalOrder {} returned no events", str, Long.valueOf(atomicLong.get()));
                    }
                    return Flux.fromIterable(list);
                } catch (RuntimeException e) {
                    log.error(MessageFormatter.msg("[{}] Polling failed", new Object[]{str}), e);
                    if (orCreateNewUnitOfWork != null) {
                        try {
                            orCreateNewUnitOfWork.rollback(e);
                        } catch (Exception e2) {
                            log.error(MessageFormatter.msg("[{}] Failed to rollback unit of work", new Object[]{str}), e2);
                        }
                    }
                    logger.error(MessageFormatter.msg("[{}] Returning Error for '{}' EventStream with nextFromInclusiveGlobalOrder {}", new Object[]{str, aggregateType, Long.valueOf(atomicLong.get())}), e);
                    return Flux.error(e);
                }
            } catch (ConnectionException e3) {
                logger.debug(MessageFormatter.msg("[{}] Experienced a Postgresql Connection issue, will return an empty Flux", new Object[]{str}), e3);
                return Flux.empty();
            }
        }).doOnNext(persistedEvent -> {
            long longValue = persistedEvent.globalEventOrder().longValue() + 1;
            logger.trace("[{}] Updating nextFromInclusiveGlobalOrder from {} to {}", new Object[]{str, Long.valueOf(atomicLong.get()), Long.valueOf(longValue)});
            atomicLong.set(longValue);
        }).doOnError(th -> {
            logger.error(MessageFormatter.msg("[{}] Failed {}", new Object[]{str, th.getMessage()}), th);
        }).repeatWhen(flux -> {
            return Flux.interval((Duration) optional2.orElse(Duration.ofMillis(500L)));
        }).publishOn(Schedulers.newSingle("Publish-" + optional4 + "-" + aggregateType, true));
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public UnitOfWorkFactory getUnitOfWorkFactory() {
        return this.unitOfWorkFactory;
    }

    @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore
    public EventStore<CONFIG> addAggregateTypeConfiguration(CONFIG config) {
        this.aggregateTypeConfigurations.addAggregateTypeConfiguration(config);
        return this;
    }
}
