/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.eventing.impl;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.scheduling.IScheduleService;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.common.utilities.SystemClock;
import org.enodeframework.eventing.DomainEventStreamMessage;
import org.enodeframework.eventing.EnqueueMessageResult;
import org.enodeframework.eventing.IProcessingEventProcessor;
import org.enodeframework.eventing.IPublishedVersionStore;
import org.enodeframework.eventing.ProcessingEvent;
import org.enodeframework.eventing.ProcessingEventMailBox;
import org.enodeframework.messaging.IMessageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultProcessingEventProcessor
implements IProcessingEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProcessingEventProcessor.class);
    private final String scanInactiveMailBoxTaskName;
    private final String processTryToRefreshAggregateTaskName;
    private final String name = "DefaultEventProcessor";
    private final ConcurrentHashMap<String, ProcessingEventMailBox> toRefreshAggregateRootMailBoxDict;
    private final ConcurrentHashMap<String, ProcessingEventMailBox> mailboxDict;
    private final ConcurrentHashMap<String, Boolean> refreshingAggregateRootDict;
    private final IScheduleService scheduleService;
    private final ISerializeService serializeService;
    private final IMessageDispatcher messageDispatcher;
    private final IPublishedVersionStore publishedVersionStore;
    private int timeoutSeconds = 259200;
    private int scanExpiredAggregateIntervalMilliseconds = 5000;
    private int processTryToRefreshAggregateIntervalMilliseconds = 1000;

    public DefaultProcessingEventProcessor(IScheduleService scheduleService, ISerializeService serializeService, IMessageDispatcher messageDispatcher, IPublishedVersionStore publishedVersionStore) {
        this.scheduleService = scheduleService;
        this.serializeService = serializeService;
        this.messageDispatcher = messageDispatcher;
        this.publishedVersionStore = publishedVersionStore;
        this.mailboxDict = new ConcurrentHashMap();
        this.toRefreshAggregateRootMailBoxDict = new ConcurrentHashMap();
        this.refreshingAggregateRootDict = new ConcurrentHashMap();
        this.scanInactiveMailBoxTaskName = "CleanInactiveProcessingEventMailBoxes_" + SystemClock.now() + new Random().nextInt(10000);
        this.processTryToRefreshAggregateTaskName = "ProcessTryToRefreshAggregate_" + SystemClock.now() + new Random().nextInt(10000);
    }

    @Override
    public void process(ProcessingEvent processingMessage) {
        EnqueueMessageResult enqueueResult;
        String aggregateRootId = processingMessage.getMessage().getAggregateRootId();
        if (Strings.isNullOrEmpty((String)aggregateRootId)) {
            throw new IllegalArgumentException("aggregateRootId of domain event stream cannot be null or empty, domainEventStreamId:" + processingMessage.getMessage().getId());
        }
        ProcessingEventMailBox mailbox2 = this.mailboxDict.computeIfAbsent(aggregateRootId, key2 -> this.buildProcessingEventMailBox(processingMessage));
        long mailboxTryUsingCount = 0L;
        while (!mailbox2.tryUsing()) {
            Task.sleep(1L);
            if (++mailboxTryUsingCount % 10000L != 0L) continue;
            logger.warn("Event mailbox try using count: {}, aggregateRootId: {}, aggregateRootTypeName: {}", new Object[]{mailboxTryUsingCount, mailbox2.getAggregateRootId(), mailbox2.getAggregateRootTypeName()});
        }
        if (mailbox2.isRemoved()) {
            mailbox2 = this.mailboxDict.computeIfAbsent(aggregateRootId, key2 -> this.buildProcessingEventMailBox(processingMessage));
        }
        if ((enqueueResult = mailbox2.enqueueMessage(processingMessage)) == EnqueueMessageResult.Ignored) {
            processingMessage.getProcessContext().notifyEventProcessed();
        } else if (enqueueResult == EnqueueMessageResult.AddToWaitingList) {
            this.addToRefreshAggregateMailBoxToDict(mailbox2);
        }
        mailbox2.exitUsing();
    }

    private void addToRefreshAggregateMailBoxToDict(ProcessingEventMailBox mailbox2) {
        if (this.toRefreshAggregateRootMailBoxDict.putIfAbsent(mailbox2.getAggregateRootId(), mailbox2) == null) {
            logger.info("Added toRefreshPublishedVersion aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", (Object)mailbox2.getAggregateRootTypeName(), (Object)mailbox2.getAggregateRootId());
            this.tryToRefreshAggregateMailBoxNextExpectingEventVersion(mailbox2);
        }
    }

    private ProcessingEventMailBox buildProcessingEventMailBox(ProcessingEvent processingMessage) {
        return new ProcessingEventMailBox(processingMessage.getMessage().getAggregateRootTypeName(), processingMessage.getMessage().getAggregateRootId(), y -> this.dispatchProcessingMessageAsync((ProcessingEvent)y, 0));
    }

    private void tryToRefreshAggregateMailBoxNextExpectingEventVersion(ProcessingEventMailBox processingEventMailBox) {
        if (this.refreshingAggregateRootDict.putIfAbsent(processingEventMailBox.getAggregateRootId(), true) == null) {
            this.getAggregateRootLatestPublishedEventVersion(processingEventMailBox, 0);
        }
    }

    private void getAggregateRootLatestPublishedEventVersion(ProcessingEventMailBox processingEventMailBox, int retryTimes) {
        IOHelper.tryAsyncActionRecursively("GetAggregateRootLatestPublishedEventVersion", () -> this.publishedVersionStore.getPublishedVersionAsync("DefaultEventProcessor", processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId()), result -> {
            processingEventMailBox.setNextExpectingEventVersion(result + 1);
            this.refreshingAggregateRootDict.remove(processingEventMailBox.getAggregateRootId());
        }, () -> String.format("publishedVersionStore.GetPublishedVersionAsync has unknown exception, aggregateRootTypeName: %s, aggregateRootId: %s", processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId()), null, retryTimes, true);
    }

    @Override
    public void start() {
        this.scheduleService.startTask(this.scanInactiveMailBoxTaskName, this::cleanInactiveMailbox, this.scanExpiredAggregateIntervalMilliseconds, this.scanExpiredAggregateIntervalMilliseconds);
        this.scheduleService.startTask(this.processTryToRefreshAggregateTaskName, this::processToRefreshAggregateRootMailBoxs, this.processTryToRefreshAggregateIntervalMilliseconds, this.processTryToRefreshAggregateIntervalMilliseconds);
    }

    @Override
    public void stop() {
        this.scheduleService.stopTask(this.scanInactiveMailBoxTaskName);
        this.scheduleService.stopTask(this.processTryToRefreshAggregateTaskName);
    }

    private void dispatchProcessingMessageAsync(ProcessingEvent processingEvent, int retryTimes) {
        IOHelper.tryAsyncActionRecursivelyWithoutResult("DispatchProcessingMessageAsync", () -> this.messageDispatcher.dispatchMessagesAsync(processingEvent.getMessage().getEvents()), result -> {
            if (logger.isDebugEnabled()) {
                logger.debug("dispatch messages success, msg: {}", (Object)this.serializeService.serialize(processingEvent.getMessage()));
            }
            this.updatePublishedVersionAsync(processingEvent, 0);
        }, () -> String.format("sequence message [messageId:%s, messageType:%s, aggregateRootId:%s, aggregateRootVersion:%s]", processingEvent.getMessage().getId(), processingEvent.getMessage().getClass().getName(), processingEvent.getMessage().getAggregateRootId(), processingEvent.getMessage().getVersion()), null, retryTimes, true);
    }

    @Override
    public String getName() {
        return "DefaultEventProcessor";
    }

    private void updatePublishedVersionAsync(ProcessingEvent processingEvent, int retryTimes) {
        DomainEventStreamMessage message = processingEvent.getMessage();
        IOHelper.tryAsyncActionRecursivelyWithoutResult("UpdatePublishedVersionAsync", () -> this.publishedVersionStore.updatePublishedVersionAsync("DefaultEventProcessor", message.getAggregateRootTypeName(), message.getAggregateRootId(), message.getVersion()), result -> {
            if (logger.isDebugEnabled()) {
                logger.debug("update published version success, message ack: {}", (Object)this.serializeService.serialize(message));
            }
            processingEvent.complete();
        }, () -> String.format("DomainEventStreamMessage [messageId:%s, messageType:%s, aggregateRootId:%s, aggregateRootVersion:%s]", message.getId(), message.getClass().getName(), message.getAggregateRootId(), message.getVersion()), null, retryTimes, true);
    }

    private void processToRefreshAggregateRootMailBoxs() {
        ArrayList remainingMailboxList = Lists.newArrayList();
        ArrayList recoveredMailboxList = Lists.newArrayList();
        this.toRefreshAggregateRootMailBoxDict.values().forEach(aggregateRootMailBox -> {
            if (aggregateRootMailBox.getWaitingMessageCount() > 0) {
                remainingMailboxList.add(aggregateRootMailBox);
            } else {
                recoveredMailboxList.add(aggregateRootMailBox);
            }
        });
        for (ProcessingEventMailBox mailBox2 : remainingMailboxList) {
            this.tryToRefreshAggregateMailBoxNextExpectingEventVersion(mailBox2);
        }
        for (ProcessingEventMailBox mailBox2 : recoveredMailboxList) {
            ProcessingEventMailBox removed = this.toRefreshAggregateRootMailBoxDict.remove(mailBox2.getAggregateRootId());
            if (removed == null) continue;
            logger.info("Removed healthy aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", (Object)removed.getAggregateRootTypeName(), (Object)removed.getAggregateRootId());
        }
    }

    private void cleanInactiveMailbox() {
        List<Map.Entry> inactiveList2 = this.mailboxDict.entrySet().stream().filter(x -> this.isMailBoxAllowRemove((ProcessingEventMailBox)x.getValue())).collect(Collectors.toList());
        inactiveList2.forEach(entry -> {
            ProcessingEventMailBox removed;
            if (((ProcessingEventMailBox)entry.getValue()).tryUsing() && this.isMailBoxAllowRemove((ProcessingEventMailBox)entry.getValue()) && (removed = this.mailboxDict.remove(entry.getKey())) != null) {
                removed.markAsRemoved();
                logger.info("Removed inactive domain event stream mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", (Object)removed.getAggregateRootTypeName(), (Object)removed.getAggregateRootId());
            }
        });
    }

    private boolean isMailBoxAllowRemove(ProcessingEventMailBox mailbox2) {
        return mailbox2.isInactive(this.timeoutSeconds) && !mailbox2.isRunning() && mailbox2.getTotalUnHandledMessageCount() == 0 && mailbox2.getWaitingMessageCount() == 0;
    }

    public int getTimeoutSeconds() {
        return this.timeoutSeconds;
    }

    public void setTimeoutSeconds(int timeoutSeconds) {
        this.timeoutSeconds = timeoutSeconds;
    }

    public int getScanExpiredAggregateIntervalMilliseconds() {
        return this.scanExpiredAggregateIntervalMilliseconds;
    }

    public void setScanExpiredAggregateIntervalMilliseconds(int scanExpiredAggregateIntervalMilliseconds) {
        this.scanExpiredAggregateIntervalMilliseconds = scanExpiredAggregateIntervalMilliseconds;
    }

    public int getProcessTryToRefreshAggregateIntervalMilliseconds() {
        return this.processTryToRefreshAggregateIntervalMilliseconds;
    }

    public void setProcessTryToRefreshAggregateIntervalMilliseconds(int processTryToRefreshAggregateIntervalMilliseconds) {
        this.processTryToRefreshAggregateIntervalMilliseconds = processTryToRefreshAggregateIntervalMilliseconds;
    }
}

