package org.enodeframework.eventing.impl;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.scheduling.IScheduleService;
import org.enodeframework.eventing.EnqueueMessageResult;
import org.enodeframework.eventing.IEventStore;
import org.enodeframework.eventing.IProcessingEventProcessor;
import org.enodeframework.eventing.ProcessingEvent;
import org.enodeframework.eventing.ProcessingEventMailBox;
import org.enodeframework.messaging.IMessageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/eventing/impl/DefaultProcessingEventProcessor.class */
public class DefaultProcessingEventProcessor implements IProcessingEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProcessingEventProcessor.class);
    private final IScheduleService scheduleService;
    private final IMessageDispatcher messageDispatcher;
    private final IEventStore eventStore;
    private final Executor executor;
    private int timeoutSeconds = 259200;
    private int scanExpiredAggregateIntervalMilliseconds = 5000;
    private int processTryToRefreshAggregateIntervalMilliseconds = 1000;
    private final ConcurrentHashMap<String, ProcessingEventMailBox> mailboxDict = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ProcessingEventMailBox> toRefreshAggregateRootMailBoxDict = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Boolean> refreshingAggregateRootDict = new ConcurrentHashMap<>();
    private final String scanInactiveMailBoxTaskName = "CleanInactiveProcessingEventMailBoxes_" + System.currentTimeMillis() + new Random().nextInt(10000);
    private final String processTryToRefreshAggregateTaskName = "ProcessTryToRefreshAggregate_" + System.currentTimeMillis() + new Random().nextInt(10000);

    public DefaultProcessingEventProcessor(IScheduleService iScheduleService, IMessageDispatcher iMessageDispatcher, IEventStore iEventStore, Executor executor) {
        this.scheduleService = iScheduleService;
        this.messageDispatcher = iMessageDispatcher;
        this.eventStore = iEventStore;
        this.executor = executor;
    }

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void process(ProcessingEvent processingEvent) {
        String aggregateRootId = processingEvent.getMessage().getAggregateRootId();
        if (Strings.isNullOrEmpty(aggregateRootId)) {
            throw new IllegalArgumentException("aggregateRootId of domain event stream cannot be null or empty, domainEventStreamId:" + processingEvent.getMessage().getId());
        }
        ProcessingEventMailBox computeIfAbsent = this.mailboxDict.computeIfAbsent(aggregateRootId, str -> {
            return buildProcessingEventMailBox(processingEvent);
        });
        long j = 0;
        while (!computeIfAbsent.tryUsing()) {
            Task.sleep(1L);
            j++;
            if (j % 10000 == 0) {
                logger.warn("Event mailbox try using count: {}, aggregateRootId: {}, aggregateRootTypeName: {}", new Object[]{Long.valueOf(j), computeIfAbsent.getAggregateRootId(), computeIfAbsent.getAggregateRootTypeName()});
            }
        }
        if (computeIfAbsent.isRemoved()) {
            computeIfAbsent = buildProcessingEventMailBox(processingEvent);
            this.mailboxDict.putIfAbsent(aggregateRootId, computeIfAbsent);
        }
        EnqueueMessageResult enqueueMessage = computeIfAbsent.enqueueMessage(processingEvent);
        if (enqueueMessage == EnqueueMessageResult.Ignored) {
            processingEvent.getProcessContext().notifyEventProcessed();
        } else if (enqueueMessage == EnqueueMessageResult.AddToWaitingList) {
            addToRefreshAggregateMailBoxToDict(computeIfAbsent);
        }
        computeIfAbsent.exitUsing();
    }

    private void addToRefreshAggregateMailBoxToDict(ProcessingEventMailBox processingEventMailBox) {
        if (this.toRefreshAggregateRootMailBoxDict.putIfAbsent(processingEventMailBox.getAggregateRootId(), processingEventMailBox) == null) {
            logger.info("Added toRefreshPublishedVersion aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId());
            tryToRefreshAggregateMailBoxNextExpectingEventVersion(processingEventMailBox);
        }
    }

    private ProcessingEventMailBox buildProcessingEventMailBox(ProcessingEvent processingEvent) {
        return new ProcessingEventMailBox(processingEvent.getMessage().getAggregateRootTypeName(), processingEvent.getMessage().getAggregateRootId(), processingEvent2 -> {
            dispatchProcessingMessageAsync(processingEvent2, 0);
        }, this.executor);
    }

    private void tryToRefreshAggregateMailBoxNextExpectingEventVersion(ProcessingEventMailBox processingEventMailBox) {
        if (this.refreshingAggregateRootDict.putIfAbsent(processingEventMailBox.getAggregateRootId(), true) == null) {
            getAggregateRootLatestPublishedEventVersion(processingEventMailBox, 0);
        }
    }

    private void getAggregateRootLatestPublishedEventVersion(ProcessingEventMailBox processingEventMailBox, int i) {
        IOHelper.tryAsyncActionRecursively("GetAggregateRootLatestPublishedEventVersion", () -> {
            return this.eventStore.getPublishedVersionAsync(processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId());
        }, num -> {
            processingEventMailBox.setNextExpectingEventVersion(num.intValue() + 1);
            this.refreshingAggregateRootDict.remove(processingEventMailBox.getAggregateRootId());
        }, () -> {
            return String.format("publishedVersionStore.GetPublishedVersionAsync has unknown exception, aggregateRootTypeName: %s, aggregateRootId: %s", processingEventMailBox.getAggregateRootTypeName(), processingEventMailBox.getAggregateRootId());
        }, null, i, true);
    }

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void start() {
        this.scheduleService.startTask(this.scanInactiveMailBoxTaskName, this::cleanInactiveMailbox, this.scanExpiredAggregateIntervalMilliseconds, this.scanExpiredAggregateIntervalMilliseconds);
        this.scheduleService.startTask(this.processTryToRefreshAggregateTaskName, this::processToRefreshAggregateRootMailBoxs, this.processTryToRefreshAggregateIntervalMilliseconds, this.processTryToRefreshAggregateIntervalMilliseconds);
    }

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void stop() {
        this.scheduleService.stopTask(this.scanInactiveMailBoxTaskName);
        this.scheduleService.stopTask(this.processTryToRefreshAggregateTaskName);
    }

    private void dispatchProcessingMessageAsync(ProcessingEvent processingEvent, int i) {
        IOHelper.tryAsyncActionRecursivelyWithoutResult("DispatchProcessingMessageAsync", () -> {
            return this.messageDispatcher.dispatchMessagesAsync(processingEvent.getMessage().getEvents());
        }, r3 -> {
            processingEvent.complete();
        }, () -> {
            return String.format("sequence message [messageId:%s, messageType:%s, aggregateRootId:%s, aggregateRootVersion:%s]", processingEvent.getMessage().getId(), processingEvent.getMessage().getClass().getName(), processingEvent.getMessage().getAggregateRootId(), Integer.valueOf(processingEvent.getMessage().getVersion()));
        }, null, i, true);
    }

    private void processToRefreshAggregateRootMailBoxs() {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        this.toRefreshAggregateRootMailBoxDict.values().forEach(processingEventMailBox -> {
            if (processingEventMailBox.getWaitingMessageCount() > 0) {
                newArrayList.add(processingEventMailBox);
            } else {
                newArrayList2.add(processingEventMailBox);
            }
        });
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            tryToRefreshAggregateMailBoxNextExpectingEventVersion((ProcessingEventMailBox) it.next());
        }
        Iterator it2 = newArrayList2.iterator();
        while (it2.hasNext()) {
            ProcessingEventMailBox remove = this.toRefreshAggregateRootMailBoxDict.remove(((ProcessingEventMailBox) it2.next()).getAggregateRootId());
            if (remove != null) {
                logger.info("Removed healthy aggregate mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", remove.getAggregateRootTypeName(), remove.getAggregateRootId());
            }
        }
    }

    private void cleanInactiveMailbox() {
        ((List) this.mailboxDict.entrySet().stream().filter(entry -> {
            return isMailBoxAllowRemove((ProcessingEventMailBox) entry.getValue());
        }).collect(Collectors.toList())).forEach(entry2 -> {
            ProcessingEventMailBox remove;
            if (((ProcessingEventMailBox) entry2.getValue()).tryUsing() && isMailBoxAllowRemove((ProcessingEventMailBox) entry2.getValue()) && (remove = this.mailboxDict.remove(entry2.getKey())) != null) {
                remove.markAsRemoved();
                logger.info("Removed inactive domain event stream mailbox, aggregateRootTypeName: {}, aggregateRootId: {}", remove.getAggregateRootTypeName(), remove.getAggregateRootId());
            }
        });
    }

    private boolean isMailBoxAllowRemove(ProcessingEventMailBox processingEventMailBox) {
        return processingEventMailBox.isInactive(this.timeoutSeconds) && !processingEventMailBox.isRunning() && processingEventMailBox.getTotalUnHandledMessageCount() == 0 && processingEventMailBox.getWaitingMessageCount() == 0;
    }

    public int getTimeoutSeconds() {
        return this.timeoutSeconds;
    }

    public void setTimeoutSeconds(int i) {
        this.timeoutSeconds = i;
    }

    public int getScanExpiredAggregateIntervalMilliseconds() {
        return this.scanExpiredAggregateIntervalMilliseconds;
    }

    public void setScanExpiredAggregateIntervalMilliseconds(int i) {
        this.scanExpiredAggregateIntervalMilliseconds = i;
    }

    public int getProcessTryToRefreshAggregateIntervalMilliseconds() {
        return this.processTryToRefreshAggregateIntervalMilliseconds;
    }

    public void setProcessTryToRefreshAggregateIntervalMilliseconds(int i) {
        this.processTryToRefreshAggregateIntervalMilliseconds = i;
    }
}
