package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSingleSessionManager.class */
final class ServiceBusSingleSessionManager implements IServiceBusSessionManager {
    private final ClientLogger logger;
    private final String identifier;
    private final MessageSerializer serializer;
    private final Duration operationTimeout;
    private final ServiceBusSessionReactorReceiver sessionReceiver;
    private final MessageFlux messageFlux;

    /* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSingleSessionManager$SessionReceiverStream.class */
    private static final class SessionReceiverStream extends AtomicBoolean implements Consumer<FluxSink<ServiceBusSessionReactorReceiver>> {
        private final ServiceBusSessionReactorReceiver sessionReceiver;

        SessionReceiverStream(ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver) {
            super(false);
            this.sessionReceiver = serviceBusSessionReactorReceiver;
        }

        public Flux<ServiceBusSessionReactorReceiver> flux() {
            return Flux.create(this);
        }

        @Override // java.util.function.Consumer
        public void accept(FluxSink<ServiceBusSessionReactorReceiver> fluxSink) {
            fluxSink.onRequest(j -> {
                if (j != 1) {
                    fluxSink.error(new UnsupportedOperationException("Expects one request for sessionReceiver but was " + j));
                } else if (getAndSet(true)) {
                    fluxSink.error(new UnsupportedOperationException("Cannot subscribe or request for sessionReceiver more than once."));
                } else {
                    fluxSink.next(this.sessionReceiver);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSingleSessionManager(ClientLogger clientLogger, String str, ServiceBusSessionReactorReceiver serviceBusSessionReactorReceiver, int i, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions) {
        this.logger = (ClientLogger) Objects.requireNonNull(clientLogger, "logger cannot be null.");
        this.identifier = str;
        this.sessionReceiver = (ServiceBusSessionReactorReceiver) Objects.requireNonNull(serviceBusSessionReactorReceiver, "sessionReceiver cannot be null.");
        this.serializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "serializer cannot be null.");
        Objects.requireNonNull(amqpRetryOptions, "retryOptions cannot be null.");
        this.operationTimeout = amqpRetryOptions.getTryTimeout();
        this.messageFlux = new MessageFlux(new SessionReceiverStream(serviceBusSessionReactorReceiver).flux(), i, CreditFlowMode.RequestDriven, MessageFlux.NULL_RETRY_POLICY);
    }

    @Override // com.azure.messaging.servicebus.IServiceBusSessionManager
    public String getIdentifier() {
        return this.identifier;
    }

    @Override // com.azure.messaging.servicebus.IServiceBusSessionManager
    public String getLinkName(String str) {
        if (this.sessionReceiver.getSessionId().equals(str)) {
            return this.sessionReceiver.getLinkName();
        }
        return null;
    }

    @Override // com.azure.messaging.servicebus.IServiceBusSessionManager
    public Flux<ServiceBusMessageContext> receive() {
        return receiveMessages().map(serviceBusReceivedMessage -> {
            return new ServiceBusMessageContext(serviceBusReceivedMessage);
        }).onErrorResume(th -> {
            return Mono.just(new ServiceBusMessageContext(this.sessionReceiver.getSessionId(), th));
        });
    }

    @Override // com.azure.messaging.servicebus.IServiceBusSessionManager
    public Mono<Boolean> updateDisposition(String str, String str2, DispositionStatus dispositionStatus, Map<String, Object> map, String str3, String str4, ServiceBusTransactionContext serviceBusTransactionContext) {
        DeliveryState deliveryState = MessageUtils.getDeliveryState(dispositionStatus, str3, str4, map, serviceBusTransactionContext);
        return this.sessionReceiver.getSessionId().equals(str2) ? this.sessionReceiver.updateDisposition(str, deliveryState).thenReturn(true) : Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(str, deliveryState));
    }

    @Override // com.azure.messaging.servicebus.IServiceBusSessionManager, java.lang.AutoCloseable
    public void close() {
        this.sessionReceiver.closeAsync().block(this.operationTimeout);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessage> receiveMessages() {
        return this.messageFlux.map(message -> {
            ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) this.serializer.deserialize(message, ServiceBusReceivedMessage.class);
            this.logger.atVerbose().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, this.sessionReceiver.getSessionId()).addKeyValue(ServiceBusConstants.MESSAGE_ID_LOGGING_KEY, serviceBusReceivedMessage.getMessageId()).log("Received message.");
            return serviceBusReceivedMessage;
        }).doOnError(th -> {
            withLinkInfo(this.logger.atWarning()).log("Error occurred. Ending session.", new Object[]{th});
        });
    }

    private LoggingEventBuilder withLinkInfo(LoggingEventBuilder loggingEventBuilder) {
        return loggingEventBuilder.addKeyValue(ServiceBusConstants.SESSION_ID_KEY, this.sessionReceiver.getSessionId()).addKeyValue("entityPath", this.sessionReceiver.getEntityPath()).addKeyValue("linkName", this.sessionReceiver.getLinkName());
    }
}
