package org.axonframework.commandhandling.distributed;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandDispatchInterceptor;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.common.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBus.class */
public class DistributedCommandBus implements CommandBus {
    private static final String DISPATCH_ERROR_MESSAGE = "An error occurred while trying to dispatch a command on the DistributedCommandBus";
    private static final Logger logger = LoggerFactory.getLogger(DistributedCommandBus.class);
    private final RoutingStrategy routingStrategy;
    private final CommandBusConnector connector;
    private final List<CommandDispatchInterceptor> dispatchInterceptors;

    public DistributedCommandBus(CommandBusConnector commandBusConnector) {
        this(commandBusConnector, new AnnotationRoutingStrategy());
    }

    public DistributedCommandBus(CommandBusConnector commandBusConnector, RoutingStrategy routingStrategy) {
        this.dispatchInterceptors = new CopyOnWriteArrayList();
        Assert.notNull(commandBusConnector, "connector may not be null");
        Assert.notNull(routingStrategy, "routingStrategy may not be null");
        this.connector = commandBusConnector;
        this.routingStrategy = routingStrategy;
    }

    public void dispatch(CommandMessage<?> commandMessage) {
        CommandMessage<?> intercept = intercept(commandMessage);
        try {
            this.connector.send(this.routingStrategy.getRoutingKey(intercept), intercept);
        } catch (Exception e) {
            logger.error(DISPATCH_ERROR_MESSAGE, e);
        }
    }

    public <R> void dispatch(CommandMessage<?> commandMessage, CommandCallback<R> commandCallback) {
        CommandMessage<?> intercept = intercept(commandMessage);
        try {
            this.connector.send(this.routingStrategy.getRoutingKey(intercept), intercept, commandCallback);
        } catch (Exception e) {
            commandCallback.onFailure(new CommandDispatchException("An error occurred while trying to dispatch a command on the DistributedCommandBus: " + e.getMessage(), e));
        }
    }

    private CommandMessage<?> intercept(CommandMessage<?> commandMessage) {
        Iterator<CommandDispatchInterceptor> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage = it.next().handle(commandMessage);
        }
        return commandMessage;
    }

    public <C> void subscribe(String str, CommandHandler<? super C> commandHandler) {
        this.connector.subscribe(str, commandHandler);
    }

    public <C> boolean unsubscribe(String str, CommandHandler<? super C> commandHandler) {
        return this.connector.unsubscribe(str, commandHandler);
    }

    public void setCommandDispatchInterceptors(Collection<CommandDispatchInterceptor> collection) {
        this.dispatchInterceptors.clear();
        this.dispatchInterceptors.addAll(collection);
    }
}
