/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.queue.domainevent;

import com.google.common.base.Strings;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.enodeframework.common.io.ReplySocketAddress;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.common.utilities.InetUtil;
import org.enodeframework.eventing.DomainEventStreamMessage;
import org.enodeframework.eventing.IEventProcessContext;
import org.enodeframework.eventing.IEventSerializer;
import org.enodeframework.eventing.IProcessingEventProcessor;
import org.enodeframework.eventing.ProcessingEvent;
import org.enodeframework.queue.IMessageContext;
import org.enodeframework.queue.IMessageHandler;
import org.enodeframework.queue.ISendReplyService;
import org.enodeframework.queue.QueueMessage;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.enodeframework.queue.domainevent.EventStreamMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDomainEventMessageHandler
implements IMessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(DefaultDomainEventMessageHandler.class);
    private final ISendReplyService sendReplyService;
    private final IEventSerializer eventSerializer;
    private final IProcessingEventProcessor domainEventMessageProcessor;
    private final ISerializeService serializeService;
    private boolean sendEventHandledMessage = true;

    public DefaultDomainEventMessageHandler(ISendReplyService sendReplyService, IProcessingEventProcessor domainEventMessageProcessor, IEventSerializer eventSerializer, ISerializeService serializeService) {
        this.sendReplyService = sendReplyService;
        this.eventSerializer = eventSerializer;
        this.domainEventMessageProcessor = domainEventMessageProcessor;
        this.serializeService = serializeService;
    }

    public ISendReplyService getSendReplyService() {
        return this.sendReplyService;
    }

    @Override
    public void handle(QueueMessage queueMessage, IMessageContext context) {
        logger.info("Received event stream message: {}", (Object)this.serializeService.serialize(queueMessage));
        EventStreamMessage message = this.serializeService.deserialize(queueMessage.getBody(), EventStreamMessage.class);
        DomainEventStreamMessage domainEventStreamMessage = this.convertToDomainEventStream(message);
        DomainEventStreamProcessContext processContext = new DomainEventStreamProcessContext(this, domainEventStreamMessage, queueMessage, context);
        ProcessingEvent processingMessage = new ProcessingEvent(domainEventStreamMessage, processContext);
        this.domainEventMessageProcessor.process(processingMessage);
    }

    private DomainEventStreamMessage convertToDomainEventStream(EventStreamMessage message) {
        DomainEventStreamMessage domainEventStreamMessage = new DomainEventStreamMessage(message.getCommandId(), message.getAggregateRootId(), message.getVersion(), message.getAggregateRootTypeName(), this.eventSerializer.deserialize(message.getEvents()), message.getItems());
        domainEventStreamMessage.setId(message.getId());
        domainEventStreamMessage.setTimestamp(message.getTimestamp());
        return domainEventStreamMessage;
    }

    public boolean isSendEventHandledMessage() {
        return this.sendEventHandledMessage;
    }

    public void setSendEventHandledMessage(boolean sendEventHandledMessage) {
        this.sendEventHandledMessage = sendEventHandledMessage;
    }

    static class DomainEventStreamProcessContext
    implements IEventProcessContext {
        private final DefaultDomainEventMessageHandler eventConsumer;
        private final DomainEventStreamMessage domainEventStreamMessage;
        private final QueueMessage queueMessage;
        private final IMessageContext messageContext;

        public DomainEventStreamProcessContext(DefaultDomainEventMessageHandler eventConsumer, DomainEventStreamMessage domainEventStreamMessage, QueueMessage queueMessage, IMessageContext messageContext) {
            this.eventConsumer = eventConsumer;
            this.domainEventStreamMessage = domainEventStreamMessage;
            this.queueMessage = queueMessage;
            this.messageContext = messageContext;
        }

        @Override
        public CompletableFuture<Boolean> notifyEventProcessed() {
            this.messageContext.onMessageHandled(this.queueMessage);
            if (!this.eventConsumer.isSendEventHandledMessage()) {
                return Task.completedTask;
            }
            String address = (String)this.domainEventStreamMessage.getItems().getOrDefault("COMMAND_REPLY_ADDRESS", "");
            if (Strings.isNullOrEmpty((String)address)) {
                return Task.completedTask;
            }
            ReplySocketAddress replyAddress = InetUtil.toSocketAddress(address);
            if (Objects.isNull(replyAddress)) {
                logger.error("can not parse notify address, {}", (Object)this.domainEventStreamMessage);
                return Task.completedTask;
            }
            String commandResult = (String)this.domainEventStreamMessage.getItems().getOrDefault("COMMAND_RESULT", "");
            DomainEventHandledMessage domainEventHandledMessage = new DomainEventHandledMessage();
            domainEventHandledMessage.setCommandId(this.domainEventStreamMessage.getCommandId());
            domainEventHandledMessage.setAggregateRootId(this.domainEventStreamMessage.getAggregateRootId());
            domainEventHandledMessage.setCommandResult(commandResult);
            return this.eventConsumer.getSendReplyService().sendEventReply(domainEventHandledMessage, replyAddress);
        }
    }
}

