/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.eventing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.Deferred;
import kotlinx.coroutines.Dispatchers;
import org.enodeframework.common.exception.DuplicateEventStreamException;
import org.enodeframework.common.extensions.SystemClock;
import org.enodeframework.common.function.Action1;
import org.enodeframework.common.io.Task;
import org.enodeframework.eventing.DomainEventMessage;
import org.enodeframework.eventing.EventCommittingContext;
import org.enodeframework.eventing.EventCommittingContextMailBox;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Illegal identifiers - consider using --renameillegalidents true
 */
@Metadata(mv={1, 6, 0}, k=1, xi=48, d1={"\u0000X\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0005\n\u0002\b\u0003\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\n\n\u0002\u0010\t\n\u0002\b\u0003\u0018\u0000 '2\u00020\u0001:\u0001'B)\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0003\u0012\u0012\u0010\u0005\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\b0\u00070\u0006\u00a2\u0006\u0002\u0010\tJ\u0006\u0010\u0019\u001a\u00020\u001aJ\u000e\u0010\u001b\u001a\u00020\u001a2\u0006\u0010\u001c\u001a\u00020\bJ\u000e\u0010\u001d\u001a\u00020\u00112\u0006\u0010\u001e\u001a\u00020\u0003J\b\u0010\u001f\u001a\u00020\u001aH\u0002J\u000e\u0010 \u001a\u00020\u001a2\u0006\u0010!\u001a\u00020\fJ\b\u0010\"\u001a\u00020\u001aH\u0002J\b\u0010#\u001a\u00020\u001aH\u0002J\b\u0010$\u001a\u00020%H\u0002J\b\u0010&\u001a\u00020\u001aH\u0002R&\u0010\n\u001a\u001a\u0012\u0004\u0012\u00020\f\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\r0\u000b0\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u000e\u001a\u00020\u0001X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001a\u0010\u000f\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\b0\u00070\u0006X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0014\u001a\u00020\u0001X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\b0\u0016X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0011\u0010\u0002\u001a\u00020\u0003\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0017\u0010\u0018\u00a8\u0006("}, d2={"Lorg/enodeframework/eventing/EventCommittingContextMailBox;", "", "number", "", "batchSize", "handleEventAction", "Lorg/enodeframework/common/function/Action1;", "", "Lorg/enodeframework/eventing/EventCommittingContext;", "(IILorg/enodeframework/common/function/Action1;)V", "aggregateDictDict", "Ljava/util/concurrent/ConcurrentHashMap;", "", "", "asyncLockObj", "handleMessageAction", "isRunning", "", "lastActiveTime", "Ljava/util/Date;", "lockObj", "messageQueue", "Ljava/util/concurrent/ConcurrentLinkedQueue;", "getNumber", "()I", "completeRun", "", "enqueueMessage", "message", "isInactive", "timeoutSeconds", "processMessages", "removeAggregateAllEventCommittingContexts", "aggregateRootId", "setAsNotRunning", "setAsRunning", "totalUnHandledMessageCount", "", "tryRun", "Companion", "enode"})
public final class EventCommittingContextMailBox {
    @NotNull
    public static final Companion Companion = new Companion(null);
    private final int number;
    private final int batchSize;
    @NotNull
    private final Object lockObj;
    @NotNull
    private final Object asyncLockObj;
    @NotNull
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> aggregateDictDict;
    @NotNull
    private final ConcurrentLinkedQueue<EventCommittingContext> messageQueue;
    @NotNull
    private final Action1<List<EventCommittingContext>> handleMessageAction;
    @NotNull
    private Date lastActiveTime;
    private boolean isRunning;
    private static final Logger logger = LoggerFactory.getLogger(EventCommittingContextMailBox.class);
    private static final byte ONE_BYTE = 1;

    public EventCommittingContextMailBox(int number, int batchSize, @NotNull Action1<List<EventCommittingContext>> handleEventAction) {
        Intrinsics.checkNotNullParameter(handleEventAction, (String)"handleEventAction");
        this.number = number;
        this.batchSize = batchSize;
        this.lockObj = new Object();
        this.asyncLockObj = new Object();
        this.aggregateDictDict = new ConcurrentHashMap();
        this.messageQueue = new ConcurrentLinkedQueue();
        this.handleMessageAction = handleEventAction;
        this.lastActiveTime = new Date();
        this.lastActiveTime = new Date();
    }

    public final int getNumber() {
        return this.number;
    }

    private final long totalUnHandledMessageCount() {
        return ((Collection)this.messageQueue).size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void enqueueMessage(@NotNull EventCommittingContext message) {
        Intrinsics.checkNotNullParameter((Object)message, (String)"message");
        Object object = this.lockObj;
        synchronized (object) {
            boolean bl = false;
            Object[] objectArray = this.aggregateDictDict.computeIfAbsent(message.getEventStream().getAggregateRootId(), EventCommittingContextMailBox::enqueueMessage$lambda-1$lambda-0);
            Intrinsics.checkNotNullExpressionValue((Object)objectArray, (String)"aggregateDictDict.comput\u2026) { ConcurrentHashMap() }");
            Object[] eventDict = objectArray;
            if (eventDict.putIfAbsent(message.getEventStream().getId(), (byte)1) == null) {
                message.setMailBox(this);
                this.messageQueue.add(message);
                if (logger.isDebugEnabled()) {
                    objectArray = new Object[7];
                    objectArray[0] = this.getClass().getName();
                    objectArray[1] = this.getNumber();
                    objectArray[2] = message.getEventStream().getAggregateRootId();
                    objectArray[3] = message.getProcessingCommand().getMessage().getId();
                    objectArray[4] = message.getEventStream().getVersion();
                    objectArray[5] = message.getEventStream().getId();
                    List<DomainEventMessage<?>> list = message.getEventStream().getEvents();
                    Intrinsics.checkNotNullExpressionValue(list, (String)"message.eventStream.events");
                    objectArray[6] = CollectionsKt.joinToString$default((Iterable)list, (CharSequence)"|", null, null, (int)0, null, (Function1)enqueueMessage.1.1.INSTANCE, (int)30, null);
                    logger.debug("{} enqueued new message, mailboxNumber: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventIds: {}", objectArray);
                }
            } else {
                throw new DuplicateEventStreamException(message.getEventStream());
            }
            this.lastActiveTime = new Date();
            this.tryRun();
            Unit unit = Unit.INSTANCE;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void tryRun() {
        Object object = this.lockObj;
        synchronized (object) {
            boolean bl = false;
            if (this.isRunning) {
                return;
            }
            this.setAsRunning();
            if (logger.isDebugEnabled()) {
                logger.debug("{} start run, mailboxNumber: {}", (Object)this.getClass().getName(), (Object)this.getNumber());
            }
            Deferred deferred = BuildersKt.async$default((CoroutineScope)CoroutineScopeKt.CoroutineScope((CoroutineContext)((CoroutineContext)Dispatchers.getIO())), null, null, (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super Unit>, Object>(this, null){
                int label;
                final /* synthetic */ EventCommittingContextMailBox this$0;
                {
                    this.this$0 = $receiver;
                    super(2, $completion);
                }

                @Nullable
                public final Object invokeSuspend(@NotNull Object object) {
                    IntrinsicsKt.getCOROUTINE_SUSPENDED();
                    switch (this.label) {
                        case 0: {
                            ResultKt.throwOnFailure((Object)object);
                            EventCommittingContextMailBox.access$processMessages(this.this$0);
                            return Unit.INSTANCE;
                        }
                    }
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }

                @NotNull
                public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                    return (Continuation)new /* invalid duplicate definition of identical inner class */;
                }

                @Nullable
                public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super Unit> p2) {
                    return (this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
                }
            }), (int)3, null);
        }
    }

    public final void completeRun() {
        this.lastActiveTime = new Date();
        if (logger.isDebugEnabled()) {
            logger.debug("{} complete run, mailboxNumber: {}", (Object)this.getClass().getName(), (Object)this.number);
        }
        this.setAsNotRunning();
        if (this.totalUnHandledMessageCount() > 0L) {
            this.tryRun();
        }
    }

    public final void removeAggregateAllEventCommittingContexts(@NotNull String aggregateRootId) {
        Intrinsics.checkNotNullParameter((Object)aggregateRootId, (String)"aggregateRootId");
        this.aggregateDictDict.remove(aggregateRootId);
    }

    public final boolean isInactive(int timeoutSeconds) {
        return SystemClock.now() - this.lastActiveTime.getTime() >= (long)timeoutSeconds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void processMessages() {
        Object object = this.asyncLockObj;
        synchronized (object) {
            EventCommittingContext message;
            boolean bl = false;
            this.lastActiveTime = new Date();
            List messageList = new ArrayList();
            while (messageList.size() < this.batchSize && (message = this.messageQueue.poll()) != null) {
                ConcurrentHashMap<String, Byte> eventDict = this.aggregateDictDict.get(message.getEventStream().getAggregateRootId());
                if (eventDict == null || eventDict.remove(message.getEventStream().getId()) == null) continue;
                messageList.add(message);
            }
            if (messageList.isEmpty()) {
                this.completeRun();
                return;
            }
            try {
                this.handleMessageAction.apply(messageList);
            }
            catch (Exception ex) {
                Object[] objectArray = new Object[]{this.getClass().getName(), this.getNumber(), ex};
                logger.error("{} run has unknown exception, mailboxNumber: {}", objectArray);
                Task.sleep(1L);
                this.completeRun();
            }
            Unit unit = Unit.INSTANCE;
        }
    }

    private final void setAsRunning() {
        this.isRunning = true;
    }

    private final void setAsNotRunning() {
        this.isRunning = false;
    }

    private static final ConcurrentHashMap enqueueMessage$lambda-1$lambda-0(String it) {
        Intrinsics.checkNotNullParameter((Object)it, (String)"it");
        return new ConcurrentHashMap();
    }

    public static final /* synthetic */ void access$processMessages(EventCommittingContextMailBox $this) {
        $this.processMessages();
    }

    @Metadata(mv={1, 6, 0}, k=1, xi=48, d1={"\u0000\u001a\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0010\u0005\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0004\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T\u00a2\u0006\u0002\n\u0000R\u0019\u0010\u0005\u001a\n \u0007*\u0004\u0018\u00010\u00060\u0006\u00a2\u0006\b\n\u0000\u001a\u0004\b\b\u0010\t\u00a8\u0006\n"}, d2={"Lorg/enodeframework/eventing/EventCommittingContextMailBox$Companion;", "", "()V", "ONE_BYTE", "", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "getLogger", "()Lorg/slf4j/Logger;", "enode"})
    public static final class Companion {
        private Companion() {
        }

        public final Logger getLogger() {
            return logger;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

