package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus;

import dk.cloudcreate.essentials.components.common.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.EventStoreUnitOfWorkFactory;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.PersistedEventsCommitLifecycleCallback;
import dk.cloudcreate.essentials.reactive.LocalEventBus;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/bus/EventStoreLocalEventBus.class */
public class EventStoreLocalEventBus {
    private static final Logger log = LoggerFactory.getLogger("EventStoreLocalEventBus");
    private LocalEventBus<PersistedEvents> localEventBus = new LocalEventBus<>("EventStoreLocalBus", 3, this::onErrorHandler);

    public EventStoreLocalEventBus(EventStoreUnitOfWorkFactory eventStoreUnitOfWorkFactory) {
        FailFast.requireNonNull(eventStoreUnitOfWorkFactory, "No unitOfWorkFactory was supplied");
        eventStoreUnitOfWorkFactory.registerPersistedEventsCommitLifeCycleCallback(new PersistedEventsCommitLifecycleCallback() { // from class: dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.EventStoreLocalEventBus.1
            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.PersistedEventsCommitLifecycleCallback
            public void beforeCommit(UnitOfWork unitOfWork, List<PersistedEvent> list) {
                EventStoreLocalEventBus.this.localEventBus.publish(new PersistedEvents(CommitStage.BeforeCommit, unitOfWork, list));
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.PersistedEventsCommitLifecycleCallback
            public void afterCommit(UnitOfWork unitOfWork, List<PersistedEvent> list) {
                EventStoreLocalEventBus.this.localEventBus.publish(new PersistedEvents(CommitStage.AfterCommit, unitOfWork, list));
            }

            @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.PersistedEventsCommitLifecycleCallback
            public void afterRollback(UnitOfWork unitOfWork, List<PersistedEvent> list) {
                EventStoreLocalEventBus.this.localEventBus.publish(new PersistedEvents(CommitStage.AfterRollback, unitOfWork, list));
            }
        });
    }

    public LocalEventBus<PersistedEvents> localEventBus() {
        return this.localEventBus;
    }

    private void onErrorHandler(Consumer<PersistedEvents> consumer, PersistedEvents persistedEvents, Exception exc) {
        log.error(MessageFormatter.msg("Failed to publish PersistedEvents to consumer {}", new Object[]{consumer.getClass().getName()}), exc);
    }

    public LocalEventBus<PersistedEvents> addAsyncSubscriber(Consumer<PersistedEvents> consumer) {
        return this.localEventBus.addAsyncSubscriber(consumer);
    }

    public LocalEventBus<PersistedEvents> removeAsyncSubscriber(Consumer<PersistedEvents> consumer) {
        return this.localEventBus.removeAsyncSubscriber(consumer);
    }

    public LocalEventBus<PersistedEvents> addSyncSubscriber(Consumer<PersistedEvents> consumer) {
        return this.localEventBus.addSyncSubscriber(consumer);
    }

    public LocalEventBus<PersistedEvents> removeSyncSubscriber(Consumer<PersistedEvents> consumer) {
        return this.localEventBus.removeSyncSubscriber(consumer);
    }
}
