package dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.processor;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.EventStoreSubscription;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateEventStream;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.PersistedEvent;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.serializer.AggregateIdSerializer;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.EventStoreSubscriptionManager;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.FencedLockAwareSubscriber;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.SubscriptionResumePoint;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.EventOrder;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.types.GlobalEventOrder;
import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxConfig;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.InboxName;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageConsumptionMode;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.MessageHandlerInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.PatternMatchingMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus;
import dk.cloudcreate.essentials.components.foundation.types.SubscriberId;
import dk.cloudcreate.essentials.reactive.command.AnnotatedCommandHandler;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.types.LongRange;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/processor/EventProcessor.class */
public abstract class EventProcessor implements Lifecycle {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final EventStoreSubscriptionManager eventStoreSubscriptionManager;
    private final Inboxes inboxes;
    protected final DurableLocalCommandBus commandBus;
    private final EventStore eventStore;
    private boolean started;
    private List<EventStoreSubscription> eventStoreSubscriptions;
    private Consumer<Message> inboxMessageHandlerDelegate;
    private AnnotatedCommandHandler commandBusHandlerDelegate;
    private Inbox inbox;
    private PatternMatchingMessageHandler patternMatchingInboxMessageHandlerDelegate;

    /* loaded from: input_file:dk/cloudcreate/essentials/components/eventsourced/eventstore/postgresql/processor/EventProcessor$EventReferenceOrderedMessage.class */
    public static class EventReferenceOrderedMessage extends OrderedMessage {
        public static final String EVENT_REFERENCE_METADATA_KEY = "EVENT_REFERENCE";

        public EventReferenceOrderedMessage(AggregateType aggregateType, Object obj, EventOrder eventOrder) {
            this(aggregateType, obj, eventOrder, MessageMetaData.of());
        }

        public EventReferenceOrderedMessage(AggregateType aggregateType, Object obj, EventOrder eventOrder, MessageMetaData messageMetaData) {
            super(aggregateType, obj.toString(), eventOrder.longValue(), messageMetaData);
            messageMetaData.put(EVENT_REFERENCE_METADATA_KEY, "true");
        }

        public static boolean isEventReference(OrderedMessage orderedMessage) {
            FailFast.requireNonNull(orderedMessage, "No msg provided");
            return "true".equals(orderedMessage.getMetaData().get(EVENT_REFERENCE_METADATA_KEY));
        }
    }

    protected EventProcessor(EventStoreSubscriptionManager eventStoreSubscriptionManager, Inboxes inboxes, DurableLocalCommandBus durableLocalCommandBus) {
        this.eventStoreSubscriptionManager = (EventStoreSubscriptionManager) FailFast.requireNonNull(eventStoreSubscriptionManager, "No eventStoreSubscriptionManager provided");
        this.inboxes = (Inboxes) FailFast.requireNonNull(inboxes, "No inboxes instance provided");
        this.commandBus = (DurableLocalCommandBus) FailFast.requireNonNull(durableLocalCommandBus, "No commandBus provided");
        this.eventStore = (EventStore) FailFast.requireNonNull(eventStoreSubscriptionManager.getEventStore(), "No eventStore is associated with the eventStoreSubscriptionManager provided");
        setupEventAndMessageHandlers();
    }

    private void setupEventAndMessageHandlers() {
        this.commandBusHandlerDelegate = new AnnotatedCommandHandler(this);
        this.commandBus.addCommandHandler(this.commandBusHandlerDelegate);
        this.patternMatchingInboxMessageHandlerDelegate = new PatternMatchingMessageHandler(this, getMessageHandlerInterceptors());
        this.patternMatchingInboxMessageHandlerDelegate.allowUnmatchedMessages();
        this.inboxMessageHandlerDelegate = message -> {
            if (message instanceof OrderedMessage) {
                OrderedMessage orderedMessage = (OrderedMessage) message;
                if (EventReferenceOrderedMessage.isEventReference(orderedMessage)) {
                    AggregateType aggregateType = (AggregateType) orderedMessage.getPayload();
                    AggregateIdSerializer resolveAggregateIdSerializer = resolveAggregateIdSerializer(aggregateType);
                    String key = orderedMessage.getKey();
                    Object deserialize = resolveAggregateIdSerializer.deserialize(key);
                    long j = orderedMessage.order;
                    this.log.trace("Looking up event for aggregate '{}' with id '{}' and event-order {}", new Object[]{aggregateType, deserialize, Long.valueOf(j)});
                    List<PersistedEvent> eventList = ((AggregateEventStream) this.eventStore.fetchStream(aggregateType, (AggregateType) deserialize, LongRange.only(j)).orElseThrow(() -> {
                        return new IllegalArgumentException(MessageFormatter.msg("Couldn't find a matching event for aggregate '{}' with id '{}' and event-order {}", new Object[]{aggregateType, deserialize, Long.valueOf(j)}));
                    })).eventList();
                    if (eventList.size() != 1) {
                        throw new IllegalArgumentException(MessageFormatter.msg("Couldn't find a matching event for aggregate '{}' with id '{}' and event-order {}", new Object[]{aggregateType, deserialize, Long.valueOf(j)}));
                    }
                    this.patternMatchingInboxMessageHandlerDelegate.accept(OrderedMessage.of(eventList.get(0).event().deserialize(), key, j, message.getMetaData()));
                    return;
                }
            }
            this.patternMatchingInboxMessageHandlerDelegate.accept(message);
        };
    }

    protected List<MessageHandlerInterceptor> getMessageHandlerInterceptors() {
        return List.of();
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        String str = (String) FailFast.requireNonNull(getProcessorName(), "getProcessorName() returned null");
        List list = (List) FailFast.requireNonNull(reactsToEventsRelatedToAggregateTypes(), "reactsToEventsRelatedToAggregateTypes() returned null");
        this.log.info("⚙️ [{}] Starting EventProcessor - will subscribe to events related to these AggregatesType's: {}", str, list);
        this.inbox = this.inboxes.getOrCreateInbox(InboxConfig.builder().inboxName(InboxName.of(str)).messageConsumptionMode(getInboxMessageConsumptionMode()).numberOfParallelMessageConsumers(getNumberOfParallelInboxMessageConsumers()).redeliveryPolicy(getInboxRedeliveryPolicy()).build(), this.inboxMessageHandlerDelegate);
        this.eventStoreSubscriptions = (List) list.stream().map(aggregateType -> {
            final SubscriberId of = SubscriberId.of(str + ":" + aggregateType);
            EventStoreSubscription exclusivelySubscribeToAggregateEventsAsynchronously = this.eventStoreSubscriptionManager.exclusivelySubscribeToAggregateEventsAsynchronously(of, aggregateType, GlobalEventOrder.FIRST_GLOBAL_EVENT_ORDER, new FencedLockAwareSubscriber() { // from class: dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.processor.EventProcessor.1
                @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.FencedLockAwareSubscriber
                public void onLockAcquired(FencedLock fencedLock, SubscriptionResumePoint subscriptionResumePoint) {
                    EventProcessor.this.log.info("⚙️ [{}] Subscriber '{}' acquired lock. Will resumeFromAndIncluding: {}", new Object[]{str, of, subscriptionResumePoint});
                }

                @Override // dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.subscription.FencedLockAwareSubscriber
                public void onLockReleased(FencedLock fencedLock) {
                    EventProcessor.this.log.info("⚙️ [{}] Subscriber '{}''s lock was released", str, of);
                }
            }, persistedEvent -> {
                forwardEventToInbox(persistedEvent, this.inbox);
            });
            this.log.info("⚙️ [{}] Created exclusive '{}' subscription: {}", new Object[]{str, aggregateType, exclusivelySubscribeToAggregateEventsAsynchronously});
            return exclusivelySubscribeToAggregateEventsAsynchronously;
        }).collect(Collectors.toList());
        this.log.info("⚙️ [{}] Started. # of undelivered Inbox messages: {}", str, Long.valueOf(this.inbox.getNumberOfUndeliveredMessages()));
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            this.log.info("⚙️ [{}] Stopping EventProcessor", getProcessorName());
            this.eventStoreSubscriptions.forEach((v0) -> {
                v0.stop();
            });
            this.inbox.stopConsuming();
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    protected void forwardEventToInbox(PersistedEvent persistedEvent, Inbox inbox) {
        if (this.patternMatchingInboxMessageHandlerDelegate.handlesMessageWithPayload(persistedEvent.event().getEventType().get().toJavaClass())) {
            AggregateType aggregateType = persistedEvent.aggregateType();
            inbox.addMessageReceived(new EventReferenceOrderedMessage(aggregateType, resolveAggregateIdSerializer(aggregateType).serialize(persistedEvent.aggregateId()), persistedEvent.eventOrder(), new MessageMetaData((Map) persistedEvent.metaData().deserialize())));
        }
    }

    private AggregateIdSerializer resolveAggregateIdSerializer(AggregateType aggregateType) {
        return ((ConfigurableEventStore) this.eventStore).getAggregateEventStreamConfiguration(aggregateType).aggregateIdSerializer;
    }

    public abstract String getProcessorName();

    protected abstract List<AggregateType> reactsToEventsRelatedToAggregateTypes();

    protected MessageConsumptionMode getInboxMessageConsumptionMode() {
        return MessageConsumptionMode.SingleGlobalConsumer;
    }

    protected int getNumberOfParallelInboxMessageConsumers() {
        return Runtime.getRuntime().availableProcessors();
    }

    protected RedeliveryPolicy getInboxRedeliveryPolicy() {
        return RedeliveryPolicy.exponentialBackoff().setInitialRedeliveryDelay(Duration.ofMillis(200L)).setFollowupRedeliveryDelay(Duration.ofMillis(200L)).setFollowupRedeliveryDelayMultiplier(1.1d).setMaximumFollowupRedeliveryDelayThreshold(Duration.ofSeconds(3L)).setMaximumNumberOfRedeliveries(20).build();
    }

    public String toString() {
        return "⚙️ " + getClass().getSimpleName() + " { processorName='" + getProcessorName() + "', reactsToEventsRelatedToAggregateTypes=" + reactsToEventsRelatedToAggregateTypes() + ", started=" + this.started + " }";
    }

    protected final Inbox getInbox() {
        return this.inbox;
    }

    protected final EventStore getEventStore() {
        return this.eventStore;
    }

    protected final DurableLocalCommandBus getCommandBus() {
        return this.commandBus;
    }
}
