package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.processor;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.serializer.AggregateIdSerializer;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager;
import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.json.JSONDeserializationException;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageHandlerInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.PatternMatchingMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.reactive.command.AnnotatedCommandHandler;
import dk.cloudcreate.essentials.shared.FailFast;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/processor/InTransactionEventProcessor.class */
public abstract class InTransactionEventProcessor implements Lifecycle {
    protected final Logger log;
    private final EventStoreSubscriptionManager eventStoreSubscriptionManager;
    protected final DurableLocalCommandBus commandBus;
    private final EventStore eventStore;
    private boolean started;
    private List<EventStoreSubscription> eventStoreSubscriptions;
    private AnnotatedCommandHandler commandBusHandlerDelegate;
    private PatternMatchingMessageHandler patternMatchingHandlerDelegate;
    private List<MessageHandlerInterceptor> messageHandlerInterceptors;
    private boolean useExclusively;

    protected InTransactionEventProcessor(EventProcessorDependencies eventProcessorDependencies, boolean z) {
        this(((EventProcessorDependencies) FailFast.requireNonNull(eventProcessorDependencies, "No eventProcessorDependencies provided")).eventStoreSubscriptionManager, eventProcessorDependencies.commandBus, eventProcessorDependencies.messageHandlerInterceptors, z);
    }

    protected InTransactionEventProcessor(EventProcessorDependencies eventProcessorDependencies) {
        this(((EventProcessorDependencies) FailFast.requireNonNull(eventProcessorDependencies, "No eventProcessorDependencies provided")).eventStoreSubscriptionManager, eventProcessorDependencies.commandBus, eventProcessorDependencies.messageHandlerInterceptors, false);
    }

    protected InTransactionEventProcessor(EventStoreSubscriptionManager eventStoreSubscriptionManager, DurableLocalCommandBus durableLocalCommandBus, List<MessageHandlerInterceptor> list, boolean z) {
        this.log = LoggerFactory.getLogger(getClass());
        this.eventStoreSubscriptionManager = (EventStoreSubscriptionManager) FailFast.requireNonNull(eventStoreSubscriptionManager, "No eventStoreSubscriptionManager provided");
        this.commandBus = (DurableLocalCommandBus) FailFast.requireNonNull(durableLocalCommandBus, "No commandBus provided");
        this.eventStore = (EventStore) FailFast.requireNonNull(eventStoreSubscriptionManager.getEventStore(), "No eventStore is associated with the eventStoreSubscriptionManager provided");
        this.messageHandlerInterceptors = new CopyOnWriteArrayList((Collection) FailFast.requireNonNull(list, "No messageHandlerInterceptors list provided"));
        this.useExclusively = z;
        setupCommandHandler();
    }

    private void setupCommandHandler() {
        this.commandBusHandlerDelegate = new AnnotatedCommandHandler(this);
        this.commandBus.addCommandHandler(this.commandBusHandlerDelegate);
    }

    private void setupMessageHandlerDelegate() {
        if (this.patternMatchingHandlerDelegate != null) {
            return;
        }
        this.patternMatchingHandlerDelegate = new PatternMatchingMessageHandler(this, getMessageHandlerInterceptors());
        this.patternMatchingHandlerDelegate.allowUnmatchedMessages();
    }

    protected List<MessageHandlerInterceptor> getMessageHandlerInterceptors() {
        return this.messageHandlerInterceptors;
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        String str = (String) FailFast.requireNonNull(getProcessorName(), "getProcessorName() returned null");
        List<AggregateType> list = (List) FailFast.requireNonNull(reactsToEventsRelatedToAggregateTypes(), "reactsToEventsRelatedToAggregateTypes() returned null");
        Logger logger = this.log;
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = this.useExclusively ? "exclusively" : "non-exclusively";
        objArr[2] = list;
        logger.info("⚙️ [{}] Starting InTransactionEventProcessor - will subscribe '{}' to events related to these AggregatesType's: {}", objArr);
        setupMessageHandlerDelegate();
        subscribeInTransaction(list);
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            this.log.info("⚙️ [{}] Stopping InTransactionEventProcessor", getProcessorName());
            this.eventStoreSubscriptions.forEach((v0) -> {
                v0.stop();
            });
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    private void subscribeInTransaction(List<AggregateType> list) {
        this.eventStoreSubscriptions = list.stream().map(this::createEventStoreSubscription).toList();
    }

    private EventStoreSubscription createEventStoreSubscription(AggregateType aggregateType) {
        EventStoreSubscription createEventStoreSubscription = createEventStoreSubscription(aggregateType, SubscriberId.of(getProcessorName() + ":" + String.valueOf(aggregateType) + ":sync"));
        this.log.info("⚙️ [{}] InTransactionEventProcessor created subscription '{}'", getProcessorName(), createEventStoreSubscription);
        return createEventStoreSubscription;
    }

    private EventStoreSubscription createEventStoreSubscription(AggregateType aggregateType, SubscriberId subscriberId) {
        return this.useExclusively ? this.eventStoreSubscriptionManager.exclusivelySubscribeToAggregateEventsInTransaction(subscriberId, aggregateType, Optional.empty(), (persistedEvent, unitOfWork) -> {
            invokeHandler(persistedEvent, unitOfWork, aggregateType, subscriberId);
        }) : this.eventStoreSubscriptionManager.subscribeToAggregateEventsInTransaction(subscriberId, aggregateType, (persistedEvent2, unitOfWork2) -> {
            invokeHandler(persistedEvent2, unitOfWork2, aggregateType, subscriberId);
        });
    }

    private void invokeHandler(PersistedEvent persistedEvent, UnitOfWork unitOfWork, AggregateType aggregateType, SubscriberId subscriberId) {
        try {
            this.log.info("[{}-{}] Processing event: {} using unit of work '{}'", new Object[]{subscriberId, aggregateType, persistedEvent, unitOfWork.info()});
            this.patternMatchingHandlerDelegate.accept(OrderedMessage.of(persistedEvent.event().deserialize(), resolveAggregateIdSerializer(persistedEvent.aggregateType()).serialize(persistedEvent.aggregateId()), ((Long) persistedEvent.eventOrder().value()).longValue(), new MessageMetaData((Map) persistedEvent.metaData().deserialize())));
        } catch (JSONDeserializationException e) {
            this.log.error("Failed to deserialize PersistedEvent '{}'", persistedEvent.event().getEventTypeOrNamePersistenceValue(), e);
            throw e;
        }
    }

    public boolean isActive() {
        return this.eventStoreSubscriptions.stream().anyMatch((v0) -> {
            return v0.isActive();
        });
    }

    public boolean isUseExclusively() {
        return this.useExclusively;
    }

    private AggregateIdSerializer resolveAggregateIdSerializer(AggregateType aggregateType) {
        return ((ConfigurableEventStore) this.eventStore).getAggregateEventStreamConfiguration(aggregateType).aggregateIdSerializer;
    }

    public abstract String getProcessorName();

    protected abstract List<AggregateType> reactsToEventsRelatedToAggregateTypes();

    public String toString() {
        return "⚙️ " + getClass().getSimpleName() + " { processorName='" + getProcessorName() + "', reactsToEventsRelatedToAggregateTypes=" + String.valueOf(reactsToEventsRelatedToAggregateTypes()) + ", started=" + this.started + " }";
    }

    protected final EventStore getEventStore() {
        return this.eventStore;
    }

    protected final DurableLocalCommandBus getCommandBus() {
        return this.commandBus;
    }
}
