/*
 * Decompiled with CFR 0.152.
 */
package org.enodeframework.eventing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import org.enodeframework.common.exception.DuplicateEventStreamException;
import org.enodeframework.common.extensions.SystemClock;
import org.enodeframework.common.function.Action1;
import org.enodeframework.common.io.Task;
import org.enodeframework.eventing.EventCommittingContext;
import org.enodeframework.eventing.EventCommittingContextMailBox;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(mv={1, 8, 0}, k=1, xi=48, d1={"\u0000b\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0005\n\u0002\b\u0003\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u000b\n\u0002\u0010\t\n\u0002\b\u0003\u0018\u0000 *2\u00020\u0001:\u0001*B1\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0012\u0010\u0007\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t0\b\u00a2\u0006\u0002\u0010\u000bJ\u0006\u0010\u001b\u001a\u00020\u001cJ\u000e\u0010\u001d\u001a\u00020\u001c2\u0006\u0010\u001e\u001a\u00020\nJ\u0006\u0010\u001f\u001a\u00020\u001cJ\u000e\u0010 \u001a\u00020\u00132\u0006\u0010!\u001a\u00020\u0003J\b\u0010\"\u001a\u00020\u001cH\u0002J\u000e\u0010#\u001a\u00020\u001c2\u0006\u0010$\u001a\u00020\u000eJ\b\u0010%\u001a\u00020\u001cH\u0002J\b\u0010&\u001a\u00020\u001cH\u0002J\b\u0010'\u001a\u00020(H\u0002J\b\u0010)\u001a\u00020\u001cH\u0002R&\u0010\f\u001a\u001a\u0012\u0004\u0012\u00020\u000e\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u000e\u0012\u0004\u0012\u00020\u000f0\r0\rX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0001X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001a\u0010\u0011\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\n0\t0\bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0016\u001a\u00020\u0001X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\u0019\u001a\b\u0012\u0004\u0012\u00020\n0\u001aX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006+"}, d2={"Lorg/enodeframework/eventing/EventCommittingContextMailBox;", "", "number", "", "batchSize", "coroutineDispatcher", "Lkotlinx/coroutines/CoroutineDispatcher;", "handleEventAction", "Lorg/enodeframework/common/function/Action1;", "", "Lorg/enodeframework/eventing/EventCommittingContext;", "(IILkotlinx/coroutines/CoroutineDispatcher;Lorg/enodeframework/common/function/Action1;)V", "aggregateDictDict", "Ljava/util/concurrent/ConcurrentHashMap;", "", "", "asyncLockObj", "handleMessageAction", "isRunning", "", "lastActiveTime", "Ljava/util/Date;", "lockObj", "logger", "Lorg/slf4j/Logger;", "messageQueue", "Ljava/util/concurrent/ConcurrentLinkedQueue;", "completeRun", "", "enqueueMessage", "message", "getNumber", "isInactive", "timeoutSeconds", "processMessages", "removeAggregateAllEventCommittingContexts", "aggregateRootId", "setAsNotRunning", "setAsRunning", "totalUnHandledMessageCount", "", "tryRun", "Companion", "enode"})
public final class EventCommittingContextMailBox {
    @NotNull
    public static final Companion Companion = new Companion(null);
    private final int number;
    private final int batchSize;
    @NotNull
    private final CoroutineDispatcher coroutineDispatcher;
    @NotNull
    private final Logger logger;
    @NotNull
    private final Object lockObj;
    @NotNull
    private final Object asyncLockObj;
    @NotNull
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> aggregateDictDict;
    @NotNull
    private final ConcurrentLinkedQueue<EventCommittingContext> messageQueue;
    @NotNull
    private final Action1<List<EventCommittingContext>> handleMessageAction;
    @NotNull
    private Date lastActiveTime;
    private boolean isRunning;
    private static final byte ONE_BYTE = 1;

    public EventCommittingContextMailBox(int number, int batchSize, @NotNull CoroutineDispatcher coroutineDispatcher, @NotNull Action1<List<EventCommittingContext>> handleEventAction) {
        Intrinsics.checkNotNullParameter((Object)coroutineDispatcher, (String)"coroutineDispatcher");
        Intrinsics.checkNotNullParameter(handleEventAction, (String)"handleEventAction");
        this.number = number;
        this.batchSize = batchSize;
        this.coroutineDispatcher = coroutineDispatcher;
        Logger logger = LoggerFactory.getLogger(EventCommittingContextMailBox.class);
        Intrinsics.checkNotNullExpressionValue((Object)logger, (String)"getLogger(EventCommittin\u2026ntextMailBox::class.java)");
        this.logger = logger;
        this.lockObj = new Object();
        this.asyncLockObj = new Object();
        this.aggregateDictDict = new ConcurrentHashMap();
        this.messageQueue = new ConcurrentLinkedQueue();
        this.handleMessageAction = handleEventAction;
        this.lastActiveTime = new Date();
        this.lastActiveTime = new Date();
    }

    private final long totalUnHandledMessageCount() {
        return ((Collection)this.messageQueue).size();
    }

    public final void getNumber() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void enqueueMessage(@NotNull EventCommittingContext message) {
        Intrinsics.checkNotNullParameter((Object)message, (String)"message");
        Object object = this.lockObj;
        synchronized (object) {
            boolean bl = false;
            ConcurrentHashMap concurrentHashMap = this.aggregateDictDict.computeIfAbsent(message.getEventStream().getAggregateRootId(), arg_0 -> EventCommittingContextMailBox.enqueueMessage$lambda$1$lambda$0(enqueueMessage.1.eventDict.1.INSTANCE, arg_0));
            Intrinsics.checkNotNullExpressionValue((Object)concurrentHashMap, (String)"aggregateDictDict.comput\u2026) { ConcurrentHashMap() }");
            ConcurrentHashMap eventDict2 = concurrentHashMap;
            if (eventDict2.putIfAbsent(message.getEventStream().getId(), (byte)1) == null) {
                message.setMailBox(this);
                this.messageQueue.add(message);
                if (this.logger.isDebugEnabled()) {
                    Object[] objectArray = new Object[]{this.getClass().getName(), this.number, message.getEventStream().getAggregateRootId(), message.getProcessingCommand().getMessage().getId(), message.getEventStream().getVersion(), message.getEventStream().getId(), CollectionsKt.joinToString$default((Iterable)message.getEventStream().getEvents(), (CharSequence)"|", null, null, (int)0, null, (Function1)enqueueMessage.1.1.INSTANCE, (int)30, null)};
                    this.logger.debug("{} enqueued new message, mailboxNumber: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventIds: {}", objectArray);
                }
            } else {
                throw new DuplicateEventStreamException(message.getEventStream());
            }
            this.lastActiveTime = new Date();
            this.tryRun();
            Unit unit = Unit.INSTANCE;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void tryRun() {
        Object object = this.lockObj;
        synchronized (object) {
            boolean bl = false;
            if (this.isRunning) {
                return;
            }
            this.setAsRunning();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("{} start run, mailboxNumber: {}", (Object)this.getClass().getName(), (Object)this.number);
            }
            BuildersKt.async$default((CoroutineScope)CoroutineScopeKt.CoroutineScope((CoroutineContext)((CoroutineContext)this.coroutineDispatcher)), null, null, (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super Unit>, Object>(this, null){
                int label;
                final /* synthetic */ EventCommittingContextMailBox this$0;
                {
                    this.this$0 = $receiver;
                    super(2, $completion);
                }

                @Nullable
                public final Object invokeSuspend(@NotNull Object object) {
                    IntrinsicsKt.getCOROUTINE_SUSPENDED();
                    switch (this.label) {
                        case 0: {
                            ResultKt.throwOnFailure((Object)object);
                            EventCommittingContextMailBox.access$processMessages(this.this$0);
                            return Unit.INSTANCE;
                        }
                    }
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }

                @NotNull
                public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                    return (Continuation)new /* invalid duplicate definition of identical inner class */;
                }

                @Nullable
                public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super Unit> p2) {
                    return (this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
                }
            }), (int)3, null);
            return;
        }
    }

    public final void completeRun() {
        this.lastActiveTime = new Date();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("{} complete run, mailboxNumber: {}", (Object)this.getClass().getName(), (Object)this.number);
        }
        this.setAsNotRunning();
        if (this.totalUnHandledMessageCount() > 0L) {
            this.tryRun();
        }
    }

    public final void removeAggregateAllEventCommittingContexts(@NotNull String aggregateRootId) {
        Intrinsics.checkNotNullParameter((Object)aggregateRootId, (String)"aggregateRootId");
        this.aggregateDictDict.remove(aggregateRootId);
    }

    public final boolean isInactive(int timeoutSeconds) {
        return SystemClock.now() - this.lastActiveTime.getTime() >= (long)timeoutSeconds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void processMessages() {
        Object object = this.asyncLockObj;
        synchronized (object) {
            EventCommittingContext message;
            boolean bl = false;
            this.lastActiveTime = new Date();
            List messageList = new ArrayList();
            while (messageList.size() < this.batchSize && (message = this.messageQueue.poll()) != null) {
                ConcurrentHashMap<String, Byte> eventDict2 = this.aggregateDictDict.get(message.getEventStream().getAggregateRootId());
                if (eventDict2 == null || eventDict2.remove(message.getEventStream().getId()) == null) continue;
                messageList.add(message);
            }
            if (messageList.isEmpty()) {
                this.completeRun();
                return;
            }
            try {
                this.handleMessageAction.apply(messageList);
            }
            catch (Exception ex) {
                Object[] objectArray = new Object[]{this.getClass().getName(), this.number, ex};
                this.logger.error("{} run has unknown exception, mailboxNumber: {}", objectArray);
                Task.sleep(1L);
                this.completeRun();
            }
            Unit unit = Unit.INSTANCE;
        }
    }

    private final void setAsRunning() {
        this.isRunning = true;
    }

    private final void setAsNotRunning() {
        this.isRunning = false;
    }

    private static final ConcurrentHashMap enqueueMessage$lambda$1$lambda$0(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (ConcurrentHashMap)$tmp0.invoke(p0);
    }

    public static final /* synthetic */ void access$processMessages(EventCommittingContextMailBox $this) {
        $this.processMessages();
    }

    @Metadata(mv={1, 8, 0}, k=1, xi=48, d1={"\u0000\u0012\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0010\u0005\n\u0000\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0005"}, d2={"Lorg/enodeframework/eventing/EventCommittingContextMailBox$Companion;", "", "()V", "ONE_BYTE", "", "enode"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

