package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.class */
public class ServiceBusReactorAmqpConnection extends ReactorConnection implements ServiceBusAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private static final String CROSS_ENTITY_TRANSACTIONS_LINK_NAME = "crossentity-coordinator";
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReactorAmqpConnection.class);
    private final ConcurrentHashMap<String, ServiceBusManagementNode> managementNodes;
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final ServiceBusAmqpLinkProvider linkProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final String fullyQualifiedNamespace;
    private final CbsAuthorizationType authorizationType;
    private final boolean distributedTransactionsSupport;

    public ServiceBusReactorAmqpConnection(String str, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, ServiceBusAmqpLinkProvider serviceBusAmqpLinkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, boolean z) {
        super(str, connectionOptions, reactorProvider, reactorHandlerProvider, serviceBusAmqpLinkProvider, tokenManagerProvider, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST);
        this.managementNodes = new ConcurrentHashMap<>();
        this.connectionId = str;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = reactorHandlerProvider;
        this.linkProvider = serviceBusAmqpLinkProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.authorizationType = connectionOptions.getAuthorizationType();
        this.retryOptions = connectionOptions.getRetry();
        this.messageSerializer = messageSerializer;
        this.scheduler = connectionOptions.getScheduler();
        this.fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace();
        this.distributedTransactionsSupport = z;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection
    public Mono<ServiceBusManagementNode> getManagementNode(String str, MessagingEntityType messagingEntityType) {
        if (isDisposed()) {
            return Mono.error(LOGGER.logExceptionAsError(new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get management instance for '%s'", this.connectionId, str))));
        }
        String join = String.join("-", messagingEntityType.toString(), str);
        ServiceBusManagementNode serviceBusManagementNode = this.managementNodes.get(join);
        return serviceBusManagementNode != null ? Mono.just(serviceBusManagementNode) : getReactorConnection().then(Mono.defer(() -> {
            TokenManager tokenManager = new AzureTokenManagerProvider(this.authorizationType, this.fullyQualifiedNamespace, ServiceBusConstants.AZURE_ACTIVE_DIRECTORY_SCOPE).getTokenManager(getClaimsBasedSecurityNode(), str);
            return tokenManager.authorize().thenReturn(this.managementNodes.compute(join, (str2, serviceBusManagementNode2) -> {
                if (serviceBusManagementNode2 != null) {
                    LOGGER.info("A management node exists already, returning it.");
                    tokenManager.close();
                    return serviceBusManagementNode2;
                }
                String str2 = str + "-" + MANAGEMENT_SESSION_NAME;
                String str3 = str + "-" + MANAGEMENT_LINK_NAME;
                String str4 = str + "/" + MANAGEMENT_ADDRESS;
                LOGGER.atInfo().addKeyValue("linkName", str3).addKeyValue("entityPath", str).addKeyValue("address", str4).log("Creating management node.");
                return new ManagementChannel(createRequestResponseChannel(str2, str3, str4), this.fullyQualifiedNamespace, str, tokenManager, this.messageSerializer, this.retryOptions.getTryTimeout());
            }));
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection
    public Mono<AmqpSendLink> createSendLink(String str, String str2, AmqpRetryOptions amqpRetryOptions, String str3, String str4) {
        return createSession(str).cast(ServiceBusSession.class).flatMap(serviceBusSession -> {
            LOGGER.atVerbose().addKeyValue("linkName", str).log("Get or create sender link.");
            return serviceBusSession.createProducer(str + str2, str2, amqpRetryOptions.getTryTimeout(), RetryUtil.getRetryPolicy(amqpRetryOptions), str3, str4).cast(AmqpSendLink.class);
        });
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection
    public Mono<ServiceBusReceiveLink> createReceiveLink(String str, String str2, ServiceBusReceiveMode serviceBusReceiveMode, String str3, MessagingEntityType messagingEntityType, String str4) {
        return createSession(str2).cast(ServiceBusSession.class).flatMap(serviceBusSession -> {
            LOGGER.atVerbose().addKeyValue("entityPath", str2).log("Get or create consumer.");
            return serviceBusSession.createConsumer(str, str2, messagingEntityType, this.retryOptions.getTryTimeout(), RetryUtil.getRetryPolicy(this.retryOptions), serviceBusReceiveMode, str4);
        });
    }

    public Mono<AmqpSession> createSession(String str) {
        return super.createSession(this.distributedTransactionsSupport ? CROSS_ENTITY_TRANSACTIONS_LINK_NAME : str);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection
    public Mono<ServiceBusReceiveLink> createReceiveLink(String str, String str2, ServiceBusReceiveMode serviceBusReceiveMode, String str3, MessagingEntityType messagingEntityType, String str4, String str5) {
        return createSession(str2).cast(ServiceBusSession.class).flatMap(serviceBusSession -> {
            LOGGER.atVerbose().addKeyValue("entityPath", str2).log("Get or create consumer.");
            return serviceBusSession.createConsumer(str, str2, messagingEntityType, this.retryOptions.getTryTimeout(), RetryUtil.getRetryPolicy(this.retryOptions), serviceBusReceiveMode, str4, str5);
        });
    }

    protected AmqpSession createSession(String str, Session session, SessionHandler sessionHandler) {
        return new ServiceBusReactorSession(this, session, sessionHandler, str, this.reactorProvider, this.handlerProvider, this.linkProvider, getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.retryOptions, new ServiceBusCreateSessionOptions(this.distributedTransactionsSupport));
    }
}
