package org.enodeframework.queue.command;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge;
import io.vertx.kotlin.coroutines.CoroutineVerticle;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.StringCompanionObject;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.commanding.CommandStatus;
import org.enodeframework.commanding.ICommand;
import org.enodeframework.common.exception.DuplicateCommandRegisterException;
import org.enodeframework.common.function.Action;
import org.enodeframework.common.scheduling.IScheduleService;
import org.enodeframework.common.scheduling.Worker;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.common.utilities.InetUtil;
import org.enodeframework.common.utilities.ReplyMessage;
import org.enodeframework.queue.domainevent.DomainEventHandledMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: DefaultCommandResultProcessor.kt */
@Metadata(mv = {1, 4, 0}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0082\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018�� 62\u00020\u00012\u00020\u0002:\u00016B%\u0012\u0006\u0010\u0003\u001a\u00020\u0004\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\u0006\u0010\t\u001a\u00020\b¢\u0006\u0002\u0010\nJ\b\u0010\u001e\u001a\u00020\fH\u0016J\u0010\u0010\u001f\u001a\u00020 2\u0006\u0010!\u001a\u00020\u0017H\u0002J\u0010\u0010\"\u001a\u00020 2\u0006\u0010#\u001a\u00020\u000fH\u0002J\u0010\u0010$\u001a\u00020 2\u0006\u0010%\u001a\u00020&H\u0016J\u0010\u0010'\u001a\u00020 2\u0006\u0010(\u001a\u00020)H\u0002J\u001c\u0010*\u001a\u00020 2\b\u0010+\u001a\u0004\u0018\u00010\u00142\b\u0010,\u001a\u0004\u0018\u00010\u0015H\u0002J&\u0010-\u001a\u00020 2\u0006\u0010%\u001a\u00020&2\u0006\u0010.\u001a\u00020/2\f\u00100\u001a\b\u0012\u0004\u0012\u00020\u000f01H\u0016J\u0011\u00102\u001a\u00020 H\u0094@ø\u0001��¢\u0006\u0002\u00103J\u000e\u00104\u001a\u00020 2\u0006\u0010\u0007\u001a\u00020\bJ\u0011\u00105\u001a\u00020 H\u0094@ø\u0001��¢\u0006\u0002\u00103R\u000e\u0010\u000b\u001a\u00020\fX\u0082.¢\u0006\u0002\n��R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0012\u001a\u000e\u0012\u0004\u0012\u00020\u0014\u0012\u0004\u0012\u00020\u00150\u0013X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0016\u001a\b\u0012\u0004\u0012\u00020\u00170\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0018\u001a\u00020\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0019\u001a\u00020\u0014X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001c\u001a\u00020\u001dX\u0082.¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u00067"}, d2 = {"Lorg/enodeframework/queue/command/DefaultCommandResultProcessor;", "Lio/vertx/kotlin/coroutines/CoroutineVerticle;", "Lorg/enodeframework/queue/command/ICommandResultProcessor;", "scheduleService", "Lorg/enodeframework/common/scheduling/IScheduleService;", "serializeService", "Lorg/enodeframework/common/serializing/ISerializeService;", "port", "", "completionSourceTimeout", "(Lorg/enodeframework/common/scheduling/IScheduleService;Lorg/enodeframework/common/serializing/ISerializeService;II)V", "bindAddress", "Ljava/net/InetSocketAddress;", "commandExecutedMessageLocalQueue", "Ljava/util/concurrent/BlockingQueue;", "Lorg/enodeframework/commanding/CommandResult;", "commandExecutedMessageWorker", "Lorg/enodeframework/common/scheduling/Worker;", "commandTaskDict", "Lcom/google/common/cache/Cache;", "", "Lorg/enodeframework/queue/command/CommandTaskCompletionSource;", "domainEventHandledMessageLocalQueue", "Lorg/enodeframework/queue/domainevent/DomainEventHandledMessage;", "domainEventHandledMessageWorker", "scanExpireCommandTaskName", "started", "", "tcpEventBusBridge", "Lio/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge;", "getBindAddress", "processDomainEventHandledMessage", "", "message", "processExecutedCommandMessage", "commandResult", "processFailedSendingCommand", "command", "Lorg/enodeframework/commanding/ICommand;", "processRequestInternal", "reply", "Lorg/enodeframework/common/utilities/ReplyMessage;", "processTimeoutCommand", "commandId", "commandTaskCompletionSource", "registerProcessingCommand", "commandReturnType", "Lorg/enodeframework/commanding/CommandReturnType;", "taskCompletionSource", "Ljava/util/concurrent/CompletableFuture;", "start", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "startServer", "stop", "Companion", "enode"})
/* loaded from: input_file:org/enodeframework/queue/command/DefaultCommandResultProcessor.class */
public final class DefaultCommandResultProcessor extends CoroutineVerticle implements ICommandResultProcessor {
    private final String scanExpireCommandTaskName;
    private final Cache<String, CommandTaskCompletionSource> commandTaskDict;
    private final BlockingQueue<CommandResult> commandExecutedMessageLocalQueue;
    private final BlockingQueue<DomainEventHandledMessage> domainEventHandledMessageLocalQueue;
    private final Worker commandExecutedMessageWorker;
    private final Worker domainEventHandledMessageWorker;
    private InetSocketAddress bindAddress;
    private TcpEventBusBridge tcpEventBusBridge;
    private boolean started;
    private final IScheduleService scheduleService;
    private final ISerializeService serializeService;
    private final int port;
    private final int completionSourceTimeout;
    public static final Companion Companion = new Companion(null);
    private static final Logger logger = LoggerFactory.getLogger(DefaultCommandResultProcessor.class);

    /* compiled from: DefaultCommandResultProcessor.kt */
    @Metadata(mv = {1, 4, 0}, bv = {1, 0, 3}, k = 1, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0006"}, d2 = {"Lorg/enodeframework/queue/command/DefaultCommandResultProcessor$Companion;", "", "()V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "enode"})
    /* loaded from: input_file:org/enodeframework/queue/command/DefaultCommandResultProcessor$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public final void startServer(final int i) {
        this.bindAddress = new InetSocketAddress(InetAddress.getLocalHost(), i);
        InetSocketAddress inetSocketAddress = this.bindAddress;
        if (inetSocketAddress == null) {
            Intrinsics.throwUninitializedPropertyAccessException("bindAddress");
        }
        String uri = InetUtil.toUri(inetSocketAddress);
        getVertx().eventBus().consumer(uri, new Handler<Message<JsonObject>>() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor$startServer$1
            public final void handle(@NotNull Message<JsonObject> message) {
                Intrinsics.checkNotNullParameter(message, "msg");
                ReplyMessage replyMessage = (ReplyMessage) ((JsonObject) message.body()).mapTo(ReplyMessage.class);
                DefaultCommandResultProcessor defaultCommandResultProcessor = DefaultCommandResultProcessor.this;
                Intrinsics.checkNotNullExpressionValue(replyMessage, "replyMessage");
                defaultCommandResultProcessor.processRequestInternal(replyMessage);
            }
        });
        BridgeOptions bridgeOptions = new BridgeOptions();
        bridgeOptions.addInboundPermitted(new PermittedOptions().setAddress(uri));
        bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddress(uri));
        TcpEventBusBridge listen = TcpEventBusBridge.create(getVertx(), bridgeOptions).listen(i, new Handler<AsyncResult<TcpEventBusBridge>>() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor$startServer$2
            public final void handle(@NotNull AsyncResult<TcpEventBusBridge> asyncResult) {
                Logger logger2;
                Intrinsics.checkNotNullParameter(asyncResult, "res");
                if (asyncResult.succeeded()) {
                    return;
                }
                logger2 = DefaultCommandResultProcessor.logger;
                logger2.error("vertx netServer start failed. port: {}", Integer.valueOf(i), asyncResult.cause());
            }
        });
        Intrinsics.checkNotNullExpressionValue(listen, "TcpEventBusBridge.create…)\n            }\n        }");
        this.tcpEventBusBridge = listen;
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    public void registerProcessingCommand(@NotNull ICommand iCommand, @NotNull CommandReturnType commandReturnType, @NotNull CompletableFuture<CommandResult> completableFuture) {
        Intrinsics.checkNotNullParameter(iCommand, "command");
        Intrinsics.checkNotNullParameter(commandReturnType, "commandReturnType");
        Intrinsics.checkNotNullParameter(completableFuture, "taskCompletionSource");
        if (this.commandTaskDict.asMap().containsKey(iCommand.getId())) {
            StringCompanionObject stringCompanionObject = StringCompanionObject.INSTANCE;
            Object[] objArr = {iCommand.getClass().getName(), iCommand.getId()};
            String format = String.format("Duplicate processing command registration, type:%s, id:%s", Arrays.copyOf(objArr, objArr.length));
            Intrinsics.checkNotNullExpressionValue(format, "java.lang.String.format(format, *args)");
            throw new DuplicateCommandRegisterException(format);
        }
        ConcurrentMap asMap = this.commandTaskDict.asMap();
        Intrinsics.checkNotNullExpressionValue(asMap, "commandTaskDict.asMap()");
        String id = iCommand.getId();
        String aggregateRootId = iCommand.getAggregateRootId();
        Intrinsics.checkNotNullExpressionValue(aggregateRootId, "command.aggregateRootId");
        asMap.put(id, new CommandTaskCompletionSource(aggregateRootId, commandReturnType, completableFuture));
    }

    @Nullable
    protected Object start(@NotNull Continuation<? super Unit> continuation) {
        if (this.started) {
            return Unit.INSTANCE;
        }
        startServer(this.port);
        this.commandExecutedMessageWorker.start();
        this.domainEventHandledMessageWorker.start();
        this.scheduleService.startTask(this.scanExpireCommandTaskName, new Action() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor$start$2
            @Override // org.enodeframework.common.function.Action
            public final void apply() {
                Cache cache;
                cache = DefaultCommandResultProcessor.this.commandTaskDict;
                cache.cleanUp();
            }
        }, this.completionSourceTimeout, this.completionSourceTimeout);
        this.started = true;
        return Unit.INSTANCE;
    }

    @Nullable
    protected Object stop(@NotNull Continuation<? super Unit> continuation) {
        this.scheduleService.stopTask(this.scanExpireCommandTaskName);
        this.commandExecutedMessageWorker.stop();
        this.domainEventHandledMessageWorker.stop();
        TcpEventBusBridge tcpEventBusBridge = this.tcpEventBusBridge;
        if (tcpEventBusBridge == null) {
            Intrinsics.throwUninitializedPropertyAccessException("tcpEventBusBridge");
        }
        tcpEventBusBridge.close();
        return Unit.INSTANCE;
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    @NotNull
    public InetSocketAddress getBindAddress() {
        InetSocketAddress inetSocketAddress = this.bindAddress;
        if (inetSocketAddress == null) {
            Intrinsics.throwUninitializedPropertyAccessException("bindAddress");
        }
        return inetSocketAddress;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void processRequestInternal(ReplyMessage replyMessage) {
        if (replyMessage.getCode() == CommandReturnType.CommandExecuted.getValue()) {
            this.commandExecutedMessageLocalQueue.add(replyMessage.getCommandResult());
        } else if (replyMessage.getCode() == CommandReturnType.EventHandled.getValue()) {
            this.domainEventHandledMessageLocalQueue.add(replyMessage.getEventHandledMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void processExecutedCommandMessage(CommandResult commandResult) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().get(commandResult.getCommandId());
        if (commandTaskCompletionSource == null) {
            Logger logger2 = logger;
            Intrinsics.checkNotNullExpressionValue(logger2, "logger");
            if (logger2.isDebugEnabled()) {
                logger.debug("Command result return, {}, but commandTaskCompletionSource maybe timeout expired.", this.serializeService.serialize(commandResult));
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType() == CommandReturnType.CommandExecuted) {
            this.commandTaskDict.asMap().remove(commandResult.getCommandId());
            if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult)) {
                Logger logger3 = logger;
                Intrinsics.checkNotNullExpressionValue(logger3, "logger");
                if (logger3.isDebugEnabled()) {
                    logger.debug("Command result return CommandExecuted, {}", this.serializeService.serialize(commandResult));
                    return;
                }
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType() == CommandReturnType.EventHandled) {
            if (CommandStatus.Failed == commandResult.getStatus() || CommandStatus.NothingChanged == commandResult.getStatus()) {
                this.commandTaskDict.asMap().remove(commandResult.getCommandId());
                if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult)) {
                    Logger logger4 = logger;
                    Intrinsics.checkNotNullExpressionValue(logger4, "logger");
                    if (logger4.isDebugEnabled()) {
                        logger.debug("Command result return EventHandled, {}", this.serializeService.serialize(commandResult));
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void processTimeoutCommand(String str, CommandTaskCompletionSource commandTaskCompletionSource) {
        if (commandTaskCompletionSource != null) {
            logger.error("Wait command notify timeout, commandId: {}", str);
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, str, commandTaskCompletionSource.getAggregateRootId(), "Wait command notify timeout.", String.class.getName()));
        }
    }

    @Override // org.enodeframework.queue.command.ICommandResultProcessor
    public void processFailedSendingCommand(@NotNull ICommand iCommand) {
        Intrinsics.checkNotNullParameter(iCommand, "command");
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().remove(iCommand.getId());
        if (commandTaskCompletionSource != null) {
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, iCommand.getId(), iCommand.getAggregateRootId(), "Failed to send the command.", String.class.getName()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void processDomainEventHandledMessage(DomainEventHandledMessage domainEventHandledMessage) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().remove(domainEventHandledMessage.getCommandId());
        if (commandTaskCompletionSource != null) {
            CommandResult commandResult = new CommandResult(CommandStatus.Success, domainEventHandledMessage.getCommandId(), domainEventHandledMessage.getAggregateRootId(), domainEventHandledMessage.getCommandResult(), domainEventHandledMessage.getCommandResult() != null ? String.class.getName() : null);
            if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult)) {
                Logger logger2 = logger;
                Intrinsics.checkNotNullExpressionValue(logger2, "logger");
                if (logger2.isDebugEnabled()) {
                    logger.debug("DomainEvent result return, {}", this.serializeService.serialize(commandResult));
                }
            }
        }
    }

    public DefaultCommandResultProcessor(@NotNull IScheduleService iScheduleService, @NotNull ISerializeService iSerializeService, int i, int i2) {
        Intrinsics.checkNotNullParameter(iScheduleService, "scheduleService");
        Intrinsics.checkNotNullParameter(iSerializeService, "serializeService");
        this.scheduleService = iScheduleService;
        this.serializeService = iSerializeService;
        this.port = i;
        this.completionSourceTimeout = i2;
        this.scanExpireCommandTaskName = "CleanTimeoutCommandTask_" + System.currentTimeMillis() + new Random().nextInt(10000);
        Cache<String, CommandTaskCompletionSource> build = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CommandTaskCompletionSource>() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor.1
            public final void onRemoval(@NotNull RemovalNotification<String, CommandTaskCompletionSource> removalNotification) {
                Intrinsics.checkNotNullParameter(removalNotification, "notification");
                if (removalNotification.getCause() == RemovalCause.EXPIRED) {
                    DefaultCommandResultProcessor.this.processTimeoutCommand((String) removalNotification.getKey(), (CommandTaskCompletionSource) removalNotification.getValue());
                }
            }
        }).expireAfterWrite(this.completionSourceTimeout, TimeUnit.MILLISECONDS).build();
        Intrinsics.checkNotNullExpressionValue(build, "CacheBuilder.newBuilder(…nit.MILLISECONDS).build()");
        this.commandTaskDict = build;
        this.commandExecutedMessageLocalQueue = new LinkedBlockingQueue();
        this.domainEventHandledMessageLocalQueue = new LinkedBlockingQueue();
        this.commandExecutedMessageWorker = new Worker("ProcessExecutedCommandMessage", new Action() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor.2
            @Override // org.enodeframework.common.function.Action
            public final void apply() {
                DefaultCommandResultProcessor defaultCommandResultProcessor = DefaultCommandResultProcessor.this;
                Object take = DefaultCommandResultProcessor.this.commandExecutedMessageLocalQueue.take();
                Intrinsics.checkNotNullExpressionValue(take, "commandExecutedMessageLocalQueue.take()");
                defaultCommandResultProcessor.processExecutedCommandMessage((CommandResult) take);
            }
        });
        this.domainEventHandledMessageWorker = new Worker("ProcessDomainEventHandledMessage", new Action() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor.3
            @Override // org.enodeframework.common.function.Action
            public final void apply() {
                DefaultCommandResultProcessor defaultCommandResultProcessor = DefaultCommandResultProcessor.this;
                Object take = DefaultCommandResultProcessor.this.domainEventHandledMessageLocalQueue.take();
                Intrinsics.checkNotNullExpressionValue(take, "domainEventHandledMessageLocalQueue.take()");
                defaultCommandResultProcessor.processDomainEventHandledMessage((DomainEventHandledMessage) take);
            }
        });
    }
}
