package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreException;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.CommitStage;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.PersistedEvents;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.GlobalEventOrder;
import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockName;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.components.foundation.types.Tenant;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.functional.tuple.Pair;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/EventStoreSubscriptionManager.class */
public interface EventStoreSubscriptionManager extends Lifecycle {

    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/EventStoreSubscriptionManager$DefaultEventStoreSubscriptionManager.class */
    public static class DefaultEventStoreSubscriptionManager implements EventStoreSubscriptionManager {
        private static final Logger log = LoggerFactory.getLogger(EventStoreSubscriptionManager.class);
        private final EventStore<?> eventStore;
        private final int eventStorePollingBatchSize;
        private final Duration eventStorePollingInterval;
        private final FencedLockManager fencedLockManager;
        private final DurableSubscriptionRepository durableSubscriptionRepository;
        private final Duration snapshotResumePointsEvery;
        private final ConcurrentMap<Pair<SubscriberId, AggregateType>, EventStoreSubscription> subscribers = new ConcurrentHashMap();
        private volatile boolean started;
        private ScheduledFuture<?> saveResumePointsFuture;

        /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/EventStoreSubscriptionManager$DefaultEventStoreSubscriptionManager$ExclusiveAsynchronousSubscription.class */
        private class ExclusiveAsynchronousSubscription implements EventStoreSubscription {
            private final EventStore<?> eventStore;
            private final FencedLockManager fencedLockManager;
            private final DurableSubscriptionRepository durableSubscriptionRepository;
            private final AggregateType aggregateType;
            private final SubscriberId subscriberId;
            private final GlobalEventOrder onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder;
            private final Optional<Tenant> onlyIncludeEventsForTenant;
            private final FencedLockAwareSubscriber fencedLockAwareSubscriber;
            private final PersistedEventHandler eventHandler;
            private final LockName lockName;
            private SubscriptionResumePoint resumePoint;
            private Disposable subscription;
            private volatile boolean started;
            private volatile boolean active;

            public ExclusiveAsynchronousSubscription(EventStore<?> eventStore, FencedLockManager fencedLockManager, DurableSubscriptionRepository durableSubscriptionRepository, AggregateType aggregateType, SubscriberId subscriberId, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, FencedLockAwareSubscriber fencedLockAwareSubscriber, PersistedEventHandler persistedEventHandler) {
                this.eventStore = (EventStore) FailFast.requireNonNull(eventStore, "No eventStore provided");
                this.fencedLockManager = (FencedLockManager) FailFast.requireNonNull(fencedLockManager, "No fencedLockManager provided");
                this.durableSubscriptionRepository = (DurableSubscriptionRepository) FailFast.requireNonNull(durableSubscriptionRepository, "No durableSubscriptionRepository provided");
                this.aggregateType = (AggregateType) FailFast.requireNonNull(aggregateType, "No aggregateType provided");
                this.subscriberId = (SubscriberId) FailFast.requireNonNull(subscriberId, "No subscriberId provided");
                this.onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder = (GlobalEventOrder) FailFast.requireNonNull(globalEventOrder, "No onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder provided");
                this.onlyIncludeEventsForTenant = (Optional) FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant provided");
                this.fencedLockAwareSubscriber = (FencedLockAwareSubscriber) FailFast.requireNonNull(fencedLockAwareSubscriber, "No fencedLockAwareSubscriber provided");
                this.eventHandler = (PersistedEventHandler) FailFast.requireNonNull(persistedEventHandler, "No eventHandler provided");
                this.lockName = LockName.of(MessageFormatter.msg("[{}-{}]", new Object[]{subscriberId, aggregateType}));
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public SubscriberId subscriberId() {
                return this.subscriberId;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public AggregateType aggregateType() {
                return this.aggregateType;
            }

            public void start() {
                if (this.started) {
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Subscription was already started", this.subscriberId, this.aggregateType);
                    return;
                }
                this.started = true;
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Started subscriber", this.subscriberId, this.aggregateType);
                this.fencedLockManager.acquireLockAsync(this.lockName, new LockCallback() { // from class: dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager.DefaultEventStoreSubscriptionManager.ExclusiveAsynchronousSubscription.1
                    public void lockAcquired(FencedLock fencedLock) {
                        DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Acquired lock. Looking up subscription resumePoint", ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType);
                        ExclusiveAsynchronousSubscription.this.active = true;
                        ExclusiveAsynchronousSubscription.this.resumePoint = ExclusiveAsynchronousSubscription.this.durableSubscriptionRepository.getOrCreateResumePoint(ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType, ExclusiveAsynchronousSubscription.this.onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder);
                        DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Starting subscription from globalEventOrder: {}", new Object[]{ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType, ExclusiveAsynchronousSubscription.this.resumePoint.getResumeFromAndIncluding()});
                        try {
                            ExclusiveAsynchronousSubscription.this.fencedLockAwareSubscriber.onLockAcquired(fencedLock, ExclusiveAsynchronousSubscription.this.resumePoint);
                        } catch (Exception e) {
                            DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("FencedLockAwareSubscriber#onLockAcquired failed for lock {} and resumePoint {}", new Object[]{fencedLock.getName(), ExclusiveAsynchronousSubscription.this.resumePoint}), e);
                        }
                        ExclusiveAsynchronousSubscription.this.subscription = ExclusiveAsynchronousSubscription.this.eventStore.pollEvents(ExclusiveAsynchronousSubscription.this.aggregateType, ExclusiveAsynchronousSubscription.this.resumePoint.getResumeFromAndIncluding(), Optional.of(Integer.valueOf(DefaultEventStoreSubscriptionManager.this.eventStorePollingBatchSize)), Optional.of(DefaultEventStoreSubscriptionManager.this.eventStorePollingInterval), ExclusiveAsynchronousSubscription.this.onlyIncludeEventsForTenant, Optional.of(ExclusiveAsynchronousSubscription.this.subscriberId)).subscribe(persistedEvent -> {
                            DefaultEventStoreSubscriptionManager.log.trace("[{}-{}] (#{}) Received {} event with eventId '{}', aggregateId: '{}', eventOrder: {}", new Object[]{ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType, persistedEvent.globalEventOrder(), persistedEvent.event().getEventTypeOrName().toString(), persistedEvent.eventId(), persistedEvent.aggregateId(), persistedEvent.eventOrder()});
                            try {
                                ExclusiveAsynchronousSubscription.this.eventHandler.handle(persistedEvent);
                            } catch (Exception e2) {
                                ExclusiveAsynchronousSubscription.this.onErrorHandlingEvent(persistedEvent, e2);
                            } finally {
                                ExclusiveAsynchronousSubscription.this.resumePoint.setResumeFromAndIncluding(persistedEvent.globalEventOrder());
                            }
                        });
                    }

                    public void lockReleased(FencedLock fencedLock) {
                        if (ExclusiveAsynchronousSubscription.this.active) {
                            DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Lock Released. Stopping subscription", ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType);
                            try {
                                ExclusiveAsynchronousSubscription.this.fencedLockAwareSubscriber.onLockReleased(fencedLock);
                            } catch (Exception e) {
                                DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("FencedLockAwareSubscriber#onLockReleased failed for lock {}", new Object[]{fencedLock.getName()}), e);
                            }
                            try {
                                DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Stopping subscription flux", ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType);
                                ExclusiveAsynchronousSubscription.this.subscription.dispose();
                            } catch (Exception e2) {
                                DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] Failed to dispose subscription flux", new Object[]{ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType}), e2);
                            }
                            DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Advancing ResumePoint from {} to {}", new Object[]{ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType, ExclusiveAsynchronousSubscription.this.resumePoint.getResumeFromAndIncluding(), ExclusiveAsynchronousSubscription.this.resumePoint.getResumeFromAndIncluding().increment()});
                            ExclusiveAsynchronousSubscription.this.resumePoint.setResumeFromAndIncluding(ExclusiveAsynchronousSubscription.this.resumePoint.getResumeFromAndIncluding().increment());
                            ExclusiveAsynchronousSubscription.this.durableSubscriptionRepository.saveResumePoint(ExclusiveAsynchronousSubscription.this.resumePoint);
                            ExclusiveAsynchronousSubscription.this.active = false;
                            DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Stopped subscription", ExclusiveAsynchronousSubscription.this.subscriberId, ExclusiveAsynchronousSubscription.this.aggregateType);
                        }
                    }
                });
            }

            protected void onErrorHandlingEvent(PersistedEvent persistedEvent, Exception exc) {
                DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] (#{}) Skipping {} event because of error", new Object[]{this.subscriberId, this.aggregateType, persistedEvent.globalEventOrder(), persistedEvent.event().getEventTypeOrName().getValue()}), exc);
            }

            public boolean isStarted() {
                return this.started;
            }

            public void stop() {
                if (this.started) {
                    this.fencedLockManager.cancelAsyncLockAcquiring(this.lockName);
                    this.started = false;
                }
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void unsubscribe() {
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Initiating unsubscription", this.subscriberId, this.aggregateType);
                stop();
                DefaultEventStoreSubscriptionManager.this.unsubscribe(this);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void resetFrom(GlobalEventOrder globalEventOrder) {
                if (!this.started) {
                    overrideResumePoint(globalEventOrder);
                    return;
                }
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Resetting resume point and re-starts the subscriber from and including globalOrder {}", new Object[]{this.subscriberId, this.aggregateType, globalEventOrder});
                try {
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Stopping subscription flux", this.subscriberId, this.aggregateType);
                    this.subscription.dispose();
                } catch (Exception e) {
                    DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] Failed to dispose subscription flux", new Object[]{this.subscriberId, this.aggregateType}), e);
                }
                overrideResumePoint(globalEventOrder);
                this.started = false;
                start();
            }

            private void overrideResumePoint(GlobalEventOrder globalEventOrder) {
                FailFast.requireNonNull(globalEventOrder, "No subscribeFromAndIncludingGlobalOrder value provided");
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Overriding resume point to start from and include globalOrder {}", new Object[]{this.subscriberId, this.aggregateType, globalEventOrder});
                this.resumePoint.setResumeFromAndIncluding(globalEventOrder);
                this.durableSubscriptionRepository.saveResumePoint(this.resumePoint);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<SubscriptionResumePoint> currentResumePoint() {
                return Optional.of(this.resumePoint);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<Tenant> onlyIncludeEventsForTenant() {
                return this.onlyIncludeEventsForTenant;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public boolean isActive() {
                return this.active;
            }
        }

        /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/EventStoreSubscriptionManager$DefaultEventStoreSubscriptionManager$NonExclusiveAsynchronousSubscription.class */
        private class NonExclusiveAsynchronousSubscription implements EventStoreSubscription {
            private final EventStore<?> eventStore;
            private final DurableSubscriptionRepository durableSubscriptionRepository;
            private final AggregateType aggregateType;
            private final SubscriberId subscriberId;
            private final GlobalEventOrder onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder;
            private final Optional<Tenant> onlyIncludeEventsForTenant;
            private final PersistedEventHandler eventHandler;
            private SubscriptionResumePoint resumePoint;
            private Disposable subscription;
            private volatile boolean started;

            public NonExclusiveAsynchronousSubscription(EventStore<?> eventStore, DurableSubscriptionRepository durableSubscriptionRepository, AggregateType aggregateType, SubscriberId subscriberId, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, PersistedEventHandler persistedEventHandler) {
                this.eventStore = (EventStore) FailFast.requireNonNull(eventStore, "No eventStore provided");
                this.durableSubscriptionRepository = (DurableSubscriptionRepository) FailFast.requireNonNull(durableSubscriptionRepository, "No durableSubscriptionRepository provided");
                this.aggregateType = (AggregateType) FailFast.requireNonNull(aggregateType, "No aggregateType provided");
                this.subscriberId = (SubscriberId) FailFast.requireNonNull(subscriberId, "No subscriberId provided");
                this.onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder = (GlobalEventOrder) FailFast.requireNonNull(globalEventOrder, "No onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder provided");
                this.onlyIncludeEventsForTenant = (Optional) FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant provided");
                this.eventHandler = (PersistedEventHandler) FailFast.requireNonNull(persistedEventHandler, "No eventHandler provided");
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public SubscriberId subscriberId() {
                return this.subscriberId;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public AggregateType aggregateType() {
                return this.aggregateType;
            }

            public void start() {
                if (this.started) {
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Subscription was already started", this.subscriberId, this.aggregateType);
                    return;
                }
                this.started = true;
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Looking up subscription resumePoint", this.subscriberId, this.aggregateType);
                this.resumePoint = this.durableSubscriptionRepository.getOrCreateResumePoint(this.subscriberId, this.aggregateType, this.onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder);
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Starting subscription from globalEventOrder: {}", new Object[]{this.subscriberId, this.aggregateType, this.resumePoint.getResumeFromAndIncluding()});
                this.subscription = this.eventStore.pollEvents(this.aggregateType, this.resumePoint.getResumeFromAndIncluding(), Optional.of(Integer.valueOf(DefaultEventStoreSubscriptionManager.this.eventStorePollingBatchSize)), Optional.of(DefaultEventStoreSubscriptionManager.this.eventStorePollingInterval), this.onlyIncludeEventsForTenant, Optional.of(this.subscriberId)).subscribe(persistedEvent -> {
                    DefaultEventStoreSubscriptionManager.log.trace("[{}-{}] (#{}) Received {} event with eventId '{}', aggregateId: '{}', eventOrder: {}", new Object[]{this.subscriberId, this.aggregateType, persistedEvent.globalEventOrder(), persistedEvent.event().getEventTypeOrName().toString(), persistedEvent.eventId(), persistedEvent.aggregateId(), persistedEvent.eventOrder()});
                    try {
                        this.eventHandler.handle(persistedEvent);
                    } catch (Exception e) {
                        onErrorHandlingEvent(persistedEvent, e);
                    } finally {
                        this.resumePoint.setResumeFromAndIncluding(persistedEvent.globalEventOrder());
                    }
                });
            }

            protected void onErrorHandlingEvent(PersistedEvent persistedEvent, Exception exc) {
                DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] (#{}) Skipping {} event because of error", new Object[]{this.subscriberId, this.aggregateType, persistedEvent.globalEventOrder(), persistedEvent.event().getEventTypeOrName().getValue()}), exc);
            }

            public boolean isStarted() {
                return this.started;
            }

            public void stop() {
                if (this.started) {
                    DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Stopping subscription", this.subscriberId, this.aggregateType);
                    try {
                        DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Stopping subscription flux", this.subscriberId, this.aggregateType);
                        this.subscription.dispose();
                    } catch (Exception e) {
                        DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] Failed to dispose subscription flux", new Object[]{this.subscriberId, this.aggregateType}), e);
                    }
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Advancing ResumePoint from {} to {}", new Object[]{this.subscriberId, this.aggregateType, this.resumePoint.getResumeFromAndIncluding(), this.resumePoint.getResumeFromAndIncluding().increment()});
                    this.resumePoint.setResumeFromAndIncluding(this.resumePoint.getResumeFromAndIncluding().increment());
                    this.durableSubscriptionRepository.saveResumePoint(this.resumePoint);
                    this.started = false;
                    DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Stopped subscription", this.subscriberId, this.aggregateType);
                }
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void unsubscribe() {
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Initiating unsubscription", this.subscriberId, this.aggregateType);
                stop();
                DefaultEventStoreSubscriptionManager.this.unsubscribe(this);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void resetFrom(GlobalEventOrder globalEventOrder) {
                if (!this.started) {
                    overrideResumePoint(globalEventOrder);
                    return;
                }
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Resetting resume point and re-starts the subscriber from and including globalOrder {}", new Object[]{this.subscriberId, this.aggregateType, globalEventOrder});
                try {
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Stopping subscription flux", this.subscriberId, this.aggregateType);
                    this.subscription.dispose();
                } catch (Exception e) {
                    DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] Failed to dispose subscription flux", new Object[]{this.subscriberId, this.aggregateType}), e);
                }
                overrideResumePoint(globalEventOrder);
                this.started = false;
                start();
            }

            private void overrideResumePoint(GlobalEventOrder globalEventOrder) {
                FailFast.requireNonNull(globalEventOrder, "No subscribeFromAndIncludingGlobalOrder value provided");
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Overriding resume point to start from and include globalOrder {}", new Object[]{this.subscriberId, this.aggregateType, globalEventOrder});
                this.resumePoint.setResumeFromAndIncluding(globalEventOrder);
                this.durableSubscriptionRepository.saveResumePoint(this.resumePoint);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<SubscriptionResumePoint> currentResumePoint() {
                return Optional.of(this.resumePoint);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<Tenant> onlyIncludeEventsForTenant() {
                return this.onlyIncludeEventsForTenant;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public boolean isActive() {
                return this.started;
            }
        }

        /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/subscription/EventStoreSubscriptionManager$DefaultEventStoreSubscriptionManager$NonExclusiveInTransactionSubscription.class */
        private class NonExclusiveInTransactionSubscription implements EventStoreSubscription {
            private final EventStore<?> eventStore;
            private final AggregateType aggregateType;
            private final SubscriberId subscriberId;
            private final Optional<Tenant> onlyIncludeEventsForTenant;
            private final TransactionalPersistedEventHandler eventHandler;
            private volatile boolean started;

            public NonExclusiveInTransactionSubscription(EventStore<?> eventStore, AggregateType aggregateType, SubscriberId subscriberId, Optional<Tenant> optional, TransactionalPersistedEventHandler transactionalPersistedEventHandler) {
                this.eventStore = (EventStore) FailFast.requireNonNull(eventStore, "No eventStore provided");
                this.aggregateType = (AggregateType) FailFast.requireNonNull(aggregateType, "No aggregateType provided");
                this.subscriberId = (SubscriberId) FailFast.requireNonNull(subscriberId, "No subscriberId provided");
                this.onlyIncludeEventsForTenant = (Optional) FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant provided");
                this.eventHandler = (TransactionalPersistedEventHandler) FailFast.requireNonNull(transactionalPersistedEventHandler, "No eventHandler provided");
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public SubscriberId subscriberId() {
                return this.subscriberId;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public AggregateType aggregateType() {
                return this.aggregateType;
            }

            public void start() {
                if (this.started) {
                    DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Subscription was already started", this.subscriberId, this.aggregateType);
                    return;
                }
                this.started = true;
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Starting subscription", this.subscriberId, this.aggregateType);
                this.eventStore.localEventBus().addSyncSubscriber(this::onEvent);
            }

            private void onEvent(PersistedEvents persistedEvents) {
                if (persistedEvents.commitStage == CommitStage.BeforeCommit) {
                    persistedEvents.events.stream().filter(persistedEvent -> {
                        return persistedEvent.aggregateType().equals(this.aggregateType);
                    }).forEach(persistedEvent2 -> {
                        DefaultEventStoreSubscriptionManager.log.trace("[{}-{}] (#{}) Received {} event with eventId '{}', aggregateId: '{}', eventOrder: {}", new Object[]{this.subscriberId, this.aggregateType, persistedEvent2.globalEventOrder(), persistedEvent2.event().getEventTypeOrName().toString(), persistedEvent2.eventId(), persistedEvent2.aggregateId(), persistedEvent2.eventOrder()});
                        try {
                            this.eventHandler.handle(persistedEvent2, persistedEvents.unitOfWork);
                        } catch (Exception e) {
                            onErrorHandlingEvent(persistedEvent2, e);
                        }
                    });
                }
            }

            protected void onErrorHandlingEvent(PersistedEvent persistedEvent, Exception exc) {
                DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] (#{}) Skipping {} event because of error", new Object[]{this.subscriberId, this.aggregateType, persistedEvent.globalEventOrder(), persistedEvent.event().getEventTypeOrName().getValue()}), exc);
            }

            public boolean isStarted() {
                return this.started;
            }

            public void stop() {
                if (this.started) {
                    DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Stopping subscription", this.subscriberId, this.aggregateType);
                    try {
                        DefaultEventStoreSubscriptionManager.log.debug("[{}-{}] Stopping subscription flux", this.subscriberId, this.aggregateType);
                        this.eventStore.localEventBus().removeSyncSubscriber(this::onEvent);
                    } catch (Exception e) {
                        DefaultEventStoreSubscriptionManager.log.error(MessageFormatter.msg("[{}-{}] Failed to dispose subscription flux", new Object[]{this.subscriberId, this.aggregateType}), e);
                    }
                    this.started = false;
                    DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Stopped subscription", this.subscriberId, this.aggregateType);
                }
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void unsubscribe() {
                DefaultEventStoreSubscriptionManager.log.info("[{}-{}] Initiating unsubscription", this.subscriberId, this.aggregateType);
                stop();
                DefaultEventStoreSubscriptionManager.this.unsubscribe(this);
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public void resetFrom(GlobalEventOrder globalEventOrder) {
                throw new EventStoreException(MessageFormatter.msg("[{}-{}] Reset of ResumePoint isn't support for an In-Transaction subscription", new Object[]{this.subscriberId, this.aggregateType}));
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<SubscriptionResumePoint> currentResumePoint() {
                return Optional.empty();
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public Optional<Tenant> onlyIncludeEventsForTenant() {
                return this.onlyIncludeEventsForTenant;
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription
            public boolean isActive() {
                return this.started;
            }
        }

        public DefaultEventStoreSubscriptionManager(EventStore<?> eventStore, int i, Duration duration, FencedLockManager fencedLockManager, Duration duration2, DurableSubscriptionRepository durableSubscriptionRepository) {
            FailFast.requireTrue(i >= 1, "eventStorePollingBatchSize must be >= 1");
            this.eventStore = (EventStore) FailFast.requireNonNull(eventStore, "No eventStore provided");
            this.eventStorePollingBatchSize = i;
            this.eventStorePollingInterval = (Duration) FailFast.requireNonNull(duration, "No eventStorePollingInterval provided");
            this.fencedLockManager = (FencedLockManager) FailFast.requireNonNull(fencedLockManager, "No fencedLockManager provided");
            this.durableSubscriptionRepository = (DurableSubscriptionRepository) FailFast.requireNonNull(durableSubscriptionRepository, "No durableSubscriptionRepository provided");
            this.snapshotResumePointsEvery = (Duration) FailFast.requireNonNull(duration2, "No snapshotResumePointsEvery provided");
            log.info("[{}] Using {} with snapshotResumePointsEvery {} using {}", new Object[]{fencedLockManager.getLockManagerInstanceId(), fencedLockManager, duration2, durableSubscriptionRepository});
        }

        public void start() {
            if (this.started) {
                log.debug("[{}] EventStore Subscription Manager was already started", this.fencedLockManager.getLockManagerInstanceId());
                return;
            }
            log.info("[{}] Starting EventStore Subscription Manager", this.fencedLockManager.getLockManagerInstanceId());
            if (!this.fencedLockManager.isStarted()) {
                this.fencedLockManager.start();
            }
            this.saveResumePointsFuture = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder.builder().nameFormat("EventStoreSubscriptionManager-" + this.fencedLockManager.getLockManagerInstanceId() + "-%d").daemon(true).build()).scheduleAtFixedRate(this::saveResumePointsForAllSubscribers, this.snapshotResumePointsEvery.toMillis(), this.snapshotResumePointsEvery.toMillis(), TimeUnit.MILLISECONDS);
            this.started = true;
            this.subscribers.values().forEach((v0) -> {
                v0.start();
            });
        }

        public void stop() {
            if (!this.started) {
                log.debug("[{}] EventStore Subscription Manager was already stopped", this.fencedLockManager.getLockManagerInstanceId());
                return;
            }
            log.info("[{}] Stopping EventStore Subscription Manager", this.fencedLockManager.getLockManagerInstanceId());
            this.saveResumePointsFuture.cancel(true);
            this.subscribers.forEach((pair, eventStoreSubscription) -> {
                eventStoreSubscription.stop();
            });
            if (this.fencedLockManager.isStarted()) {
                this.fencedLockManager.stop();
            }
            this.started = false;
            log.info("[{}] Stopped EventStore Subscription Manager", this.fencedLockManager.getLockManagerInstanceId());
        }

        public boolean isStarted() {
            return this.started;
        }

        private void saveResumePointsForAllSubscribers() {
            this.durableSubscriptionRepository.saveResumePoints((Collection) this.subscribers.values().stream().filter(eventStoreSubscription -> {
                return eventStoreSubscription.currentResumePoint().isPresent();
            }).filter((v0) -> {
                return v0.isActive();
            }).map(eventStoreSubscription2 -> {
                return eventStoreSubscription2.currentResumePoint().get();
            }).collect(Collectors.toList()));
        }

        private EventStoreSubscription addEventStoreSubscription(SubscriberId subscriberId, AggregateType aggregateType, EventStoreSubscription eventStoreSubscription) {
            FailFast.requireNonNull(subscriberId, "No subscriberId provided");
            FailFast.requireNonNull(aggregateType, "No forAggregateType provided");
            FailFast.requireNonNull(eventStoreSubscription, "No eventStoreSubscription provided");
            EventStoreSubscription putIfAbsent = this.subscribers.putIfAbsent(Pair.of(subscriberId, aggregateType), eventStoreSubscription);
            if (putIfAbsent != null) {
                log.info("[{}-{}] Event Store subscription was already added", subscriberId, aggregateType);
                return putIfAbsent;
            }
            log.info("[{}-{}] Added {} event store subscription", new Object[]{subscriberId, aggregateType, eventStoreSubscription.getClass().getSimpleName()});
            if (this.started && !eventStoreSubscription.isStarted()) {
                eventStoreSubscription.start();
            }
            return eventStoreSubscription;
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager
        public EventStoreSubscription subscribeToAggregateEventsAsynchronously(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, PersistedEventHandler persistedEventHandler) {
            FailFast.requireNonNull(globalEventOrder, "No onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder provided");
            FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant option provided");
            FailFast.requireNonNull(persistedEventHandler, "No eventHandler provided");
            return addEventStoreSubscription(subscriberId, aggregateType, new NonExclusiveAsynchronousSubscription(this.eventStore, this.durableSubscriptionRepository, aggregateType, subscriberId, globalEventOrder, optional, persistedEventHandler));
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager
        public EventStoreSubscription exclusivelySubscribeToAggregateEventsAsynchronously(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, FencedLockAwareSubscriber fencedLockAwareSubscriber, PersistedEventHandler persistedEventHandler) {
            FailFast.requireNonNull(globalEventOrder, "No onFirstSubscriptionSubscribeFromAndIncludingGlobalOrder provided");
            FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant option provided");
            FailFast.requireNonNull(persistedEventHandler, "No eventHandler provided");
            return addEventStoreSubscription(subscriberId, aggregateType, new ExclusiveAsynchronousSubscription(this.eventStore, this.fencedLockManager, this.durableSubscriptionRepository, aggregateType, subscriberId, globalEventOrder, optional, fencedLockAwareSubscriber, persistedEventHandler));
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager
        public EventStoreSubscription subscribeToAggregateEventsInTransaction(SubscriberId subscriberId, AggregateType aggregateType, Optional<Tenant> optional, TransactionalPersistedEventHandler transactionalPersistedEventHandler) {
            FailFast.requireNonNull(optional, "No onlyIncludeEventsForTenant option provided");
            FailFast.requireNonNull(transactionalPersistedEventHandler, "No eventHandler provided");
            return addEventStoreSubscription(subscriberId, aggregateType, new NonExclusiveInTransactionSubscription(this.eventStore, aggregateType, subscriberId, optional, transactionalPersistedEventHandler));
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager
        public void unsubscribe(EventStoreSubscription eventStoreSubscription) {
            FailFast.requireNonNull(eventStoreSubscription, "No eventStoreSubscription provided");
            EventStoreSubscription remove = this.subscribers.remove(Pair.of(eventStoreSubscription.subscriberId(), eventStoreSubscription.aggregateType()));
            if (remove != null) {
                log.info("[{}-{}] Unsubscribing", remove.subscriberId(), remove.aggregateType());
            }
        }

        @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager
        public boolean hasSubscription(SubscriberId subscriberId, AggregateType aggregateType) {
            return this.subscribers.containsKey(Pair.of(subscriberId, aggregateType));
        }
    }

    EventStoreSubscription subscribeToAggregateEventsAsynchronously(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, PersistedEventHandler persistedEventHandler);

    EventStoreSubscription exclusivelySubscribeToAggregateEventsAsynchronously(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, FencedLockAwareSubscriber fencedLockAwareSubscriber, PersistedEventHandler persistedEventHandler);

    default <HANDLER extends PersistedEventHandler & FencedLockAwareSubscriber> EventStoreSubscription exclusivelySubscribeToAggregateEventsAsynchronously(SubscriberId subscriberId, AggregateType aggregateType, GlobalEventOrder globalEventOrder, Optional<Tenant> optional, HANDLER handler) {
        return exclusivelySubscribeToAggregateEventsAsynchronously(subscriberId, aggregateType, globalEventOrder, optional, handler, handler);
    }

    EventStoreSubscription subscribeToAggregateEventsInTransaction(SubscriberId subscriberId, AggregateType aggregateType, Optional<Tenant> optional, TransactionalPersistedEventHandler transactionalPersistedEventHandler);

    static EventStoreSubscriptionManager createFor(EventStore<?> eventStore, int i, Duration duration, FencedLockManager fencedLockManager, Duration duration2, DurableSubscriptionRepository durableSubscriptionRepository) {
        return new DefaultEventStoreSubscriptionManager(eventStore, i, duration, fencedLockManager, duration2, durableSubscriptionRepository);
    }

    void unsubscribe(EventStoreSubscription eventStoreSubscription);

    default boolean hasSubscription(EventStoreSubscription eventStoreSubscription) {
        FailFast.requireNonNull(eventStoreSubscription, "No subscription provided");
        return hasSubscription(eventStoreSubscription.subscriberId(), eventStoreSubscription.aggregateType());
    }

    boolean hasSubscription(SubscriberId subscriberId, AggregateType aggregateType);
}
