package io.fluxcapacitor.axonclient.commandhandling;

import io.fluxcapacitor.axonclient.commandhandling.result.ResultService;
import io.fluxcapacitor.axonclient.common.serialization.AxonMessageSerializer;
import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.javaclient.tracking.ProducerService;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.MonitorAwareCallback;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;

/* loaded from: input_file:io/fluxcapacitor/axonclient/commandhandling/FluxCapacitorCommandBus.class */
public class FluxCapacitorCommandBus implements CommandBus {
    private final ProducerService producerService;
    private final ResultService resultService;
    private final AxonMessageSerializer serializer;
    private final RoutingStrategy routingStrategy;
    private final String clientId;
    private final SimpleCommandBus localCommandBus;
    private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors;
    private final MessageMonitor<? super CommandMessage<?>> messageMonitor;

    public FluxCapacitorCommandBus(ProducerService producerService, ResultService resultService, AxonMessageSerializer axonMessageSerializer, RoutingStrategy routingStrategy, String str, SimpleCommandBus simpleCommandBus) {
        this(producerService, resultService, axonMessageSerializer, routingStrategy, str, simpleCommandBus, NoOpMessageMonitor.INSTANCE);
    }

    public FluxCapacitorCommandBus(ProducerService producerService, ResultService resultService, AxonMessageSerializer axonMessageSerializer, RoutingStrategy routingStrategy, String str, SimpleCommandBus simpleCommandBus, MessageMonitor<? super CommandMessage<?>> messageMonitor) {
        this.dispatchInterceptors = new CopyOnWriteArrayList();
        this.producerService = producerService;
        this.resultService = resultService;
        this.serializer = axonMessageSerializer;
        this.routingStrategy = routingStrategy;
        this.clientId = str;
        this.localCommandBus = simpleCommandBus;
        this.messageMonitor = messageMonitor;
    }

    public <C> void dispatch(CommandMessage<C> commandMessage) {
        if (NoOpMessageMonitor.INSTANCE.equals(this.messageMonitor)) {
            send(intercept(commandMessage));
        } else {
            dispatch(commandMessage, null);
        }
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) {
        CommandMessage<? extends C> intercept = intercept(commandMessage);
        MonitorAwareCallback monitorAwareCallback = new MonitorAwareCallback(commandCallback, this.messageMonitor.onMessageIngested(intercept));
        this.resultService.awaitResult(commandMessage.getIdentifier()).handle((obj, th) -> {
            if (th != null) {
                monitorAwareCallback.onFailure(intercept, th);
                return null;
            }
            monitorAwareCallback.onSuccess(intercept, obj);
            return null;
        });
        send(commandMessage.andMetaData(Collections.singletonMap("sender", this.clientId)));
    }

    private void send(CommandMessage<?> commandMessage) {
        this.producerService.send(new Message[]{toFluxCapacitorMessage(commandMessage)});
    }

    private Message toFluxCapacitorMessage(CommandMessage<?> commandMessage) {
        Message message = new Message(this.serializer.serializeCommand(commandMessage));
        message.setSegment(Integer.valueOf(ConsistentHashing.computeSegment(this.routingStrategy.getRoutingKey(commandMessage))));
        message.setType(commandMessage.getCommandName());
        return message;
    }

    private <C> CommandMessage<? extends C> intercept(CommandMessage<C> commandMessage) {
        CommandMessage<C> commandMessage2 = commandMessage;
        Iterator<MessageDispatchInterceptor<? super CommandMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = (CommandMessage) it.next().handle(commandMessage2);
        }
        return (CommandMessage<? extends C>) commandMessage2;
    }

    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        return this.localCommandBus.subscribe(str, messageHandler);
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.localCommandBus.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public void setRollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
        this.localCommandBus.setRollbackConfiguration(rollbackConfiguration);
    }
}
