package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSessionManager.class */
class ServiceBusSessionManager implements AutoCloseable {
    private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration.ofMinutes(1);
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSessionManager.class);
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusReceiveLink receiveLink;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final Duration operationTimeout;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final AtomicBoolean isDisposed;
    private final AtomicBoolean isStarted;
    private final List<Scheduler> schedulers;
    private final Deque<Scheduler> availableSchedulers;
    private final Duration maxSessionLockRenewDuration;
    private final ConcurrentHashMap<String, ServiceBusSessionReceiver> sessionReceivers;
    private final EmitterProcessor<Flux<ServiceBusMessageContext>> processor;
    private final FluxSink<Flux<ServiceBusMessageContext>> sessionReceiveSink;
    private volatile Flux<ServiceBusMessageContext> receiveFlux;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSessionManager(String str, MessagingEntityType messagingEntityType, ServiceBusConnectionProcessor serviceBusConnectionProcessor, TracerProvider tracerProvider, MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink serviceBusReceiveLink) {
        this.isDisposed = new AtomicBoolean();
        this.isStarted = new AtomicBoolean();
        this.availableSchedulers = new ConcurrentLinkedDeque();
        this.sessionReceivers = new ConcurrentHashMap<>();
        this.entityPath = str;
        this.entityType = messagingEntityType;
        this.receiverOptions = receiverOptions;
        this.connectionProcessor = serviceBusConnectionProcessor;
        this.operationTimeout = serviceBusConnectionProcessor.getRetryOptions().getTryTimeout();
        this.tracerProvider = tracerProvider;
        this.messageSerializer = messageSerializer;
        this.maxSessionLockRenewDuration = receiverOptions.getMaxLockRenewDuration();
        int intValue = receiverOptions.isRollingSessionReceiver() ? receiverOptions.getMaxConcurrentSessions().intValue() : 1;
        this.schedulers = Collections.unmodifiableList((List) IntStream.range(0, intValue).mapToObj(i -> {
            return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "receiver-" + i);
        }).collect(Collectors.toList()));
        this.availableSchedulers.addAll(this.schedulers);
        this.processor = EmitterProcessor.create(intValue, false);
        this.sessionReceiveSink = this.processor.sink();
        this.receiveLink = serviceBusReceiveLink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSessionManager(String str, MessagingEntityType messagingEntityType, ServiceBusConnectionProcessor serviceBusConnectionProcessor, TracerProvider tracerProvider, MessageSerializer messageSerializer, ReceiverOptions receiverOptions) {
        this(str, messagingEntityType, serviceBusConnectionProcessor, tracerProvider, messageSerializer, receiverOptions, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLinkName(String str) {
        ServiceBusSessionReceiver serviceBusSessionReceiver = this.sessionReceivers.get(str);
        if (serviceBusSessionReceiver != null) {
            return serviceBusSessionReceiver.getLinkName();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<byte[]> getSessionState(String str) {
        return validateParameter(str, ServiceBusConstants.SESSION_ID_KEY, "getSessionState").then(getManagementNode().flatMap(serviceBusManagementNode -> {
            ServiceBusSessionReceiver serviceBusSessionReceiver = this.sessionReceivers.get(str);
            return serviceBusManagementNode.getSessionState(str, serviceBusSessionReceiver != null ? serviceBusSessionReceiver.getLinkName() : null);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusMessageContext> receive() {
        if (!this.isStarted.getAndSet(true)) {
            this.sessionReceiveSink.onRequest(this::onSessionRequest);
            if (this.receiverOptions.isRollingSessionReceiver()) {
                this.receiveFlux = Flux.merge(this.processor, this.receiverOptions.getMaxConcurrentSessions().intValue());
            } else {
                this.receiveFlux = getSession(this.schedulers.get(0), false);
            }
        }
        return this.receiveFlux;
    }

    Mono<OffsetDateTime> renewSessionLock(String str) {
        return validateParameter(str, ServiceBusConstants.SESSION_ID_KEY, "renewSessionLock").then(getManagementNode().flatMap(serviceBusManagementNode -> {
            ServiceBusSessionReceiver serviceBusSessionReceiver = this.sessionReceivers.get(str);
            return serviceBusManagementNode.renewSessionLock(str, serviceBusSessionReceiver != null ? serviceBusSessionReceiver.getLinkName() : null).handle((offsetDateTime, synchronousSink) -> {
                if (serviceBusSessionReceiver != null) {
                    serviceBusSessionReceiver.setSessionLockedUntil(offsetDateTime);
                }
                synchronousSink.next(offsetDateTime);
            });
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Boolean> updateDisposition(String str, String str2, DispositionStatus dispositionStatus, Map<String, Object> map, String str3, String str4, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Mono.when(new Publisher[]{validateParameter(str, ServiceBusConstants.LOCK_TOKEN_KEY, "updateDisposition"), validateParameter(str, ServiceBusConstants.LOCK_TOKEN_KEY, "updateDisposition"), validateParameter(str2, "'sessionId'", "updateDisposition")}).then(Mono.defer(() -> {
            ServiceBusSessionReceiver serviceBusSessionReceiver = this.sessionReceivers.get(str2);
            return (serviceBusSessionReceiver == null || !serviceBusSessionReceiver.containsLockToken(str)) ? Mono.just(false) : serviceBusSessionReceiver.updateDisposition(str, MessageUtils.getDeliveryState(dispositionStatus, str3, str4, map, serviceBusTransactionContext)).thenReturn(true);
        }));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        Mono.when((List) this.sessionReceivers.values().stream().map(serviceBusSessionReceiver -> {
            return serviceBusSessionReceiver.closeAsync();
        }).collect(Collectors.toList())).block(this.operationTimeout);
        this.sessionReceiveSink.complete();
        Iterator<Scheduler> it = this.schedulers.iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.connectionProcessor.getFullyQualifiedNamespace(), this.entityPath);
    }

    private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
        String sessionId = this.receiverOptions.getSessionId();
        String randomString = sessionId != null ? sessionId : StringUtil.getRandomString("session-");
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createReceiveLink(randomString, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, sessionId);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<ServiceBusReceiveLink> getActiveLink() {
        return this.receiveLink != null ? Mono.just(this.receiveLink) : Mono.defer(() -> {
            return createSessionReceiveLink().flatMap(serviceBusReceiveLink -> {
                return serviceBusReceiveLink.getEndpointStates().takeUntil(amqpEndpointState -> {
                    return amqpEndpointState == AmqpEndpointState.ACTIVE;
                }).timeout(this.operationTimeout).then(Mono.just(serviceBusReceiveLink));
            });
        }).retryWhen(Retry.from(flux -> {
            return flux.flatMap(retrySignal -> {
                AmqpException failure = retrySignal.failure();
                LOGGER.atInfo().addKeyValue("entityPath", this.entityPath).addKeyValue("attempt", retrySignal.totalRetriesInARow()).log("Error occurred while getting unnamed session.", new Object[]{failure});
                return this.isDisposed.get() ? Mono.error(new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())) : failure instanceof TimeoutException ? Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION) : ((failure instanceof AmqpException) && failure.getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) ? Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION) : Mono.error(failure);
            });
        }));
    }

    private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean z) {
        return getActiveLink().flatMap(serviceBusReceiveLink -> {
            return serviceBusReceiveLink.getSessionId().map(str -> {
                return this.sessionReceivers.compute(str, (str, serviceBusSessionReceiver) -> {
                    return serviceBusSessionReceiver != null ? serviceBusSessionReceiver : new ServiceBusSessionReceiver(serviceBusReceiveLink, this.messageSerializer, this.connectionProcessor.getRetryOptions(), this.receiverOptions.getPrefetchCount(), z, scheduler, this::renewSessionLock, this.maxSessionLockRenewDuration);
                });
            });
        }).flatMapMany(serviceBusSessionReceiver -> {
            return serviceBusSessionReceiver.receive().doFinally(signalType -> {
                LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, serviceBusSessionReceiver.getSessionId()).log("Closing session receiver.");
                this.availableSchedulers.push(scheduler);
                this.sessionReceivers.remove(serviceBusSessionReceiver.getSessionId());
                serviceBusSessionReceiver.closeAsync().subscribe();
                if (this.receiverOptions.isRollingSessionReceiver()) {
                    onSessionRequest(1L);
                }
            });
        });
    }

    private Mono<ServiceBusManagementNode> getManagementNode() {
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        });
    }

    private void onSessionRequest(long j) {
        if (this.isDisposed.get()) {
            LOGGER.info("Session manager is disposed. Not emitting more unnamed sessions.");
            return;
        }
        LOGGER.atVerbose().addKeyValue(ServiceBusConstants.NUMBER_OF_REQUESTED_MESSAGES_KEY, j).log("Requested unnamed sessions.");
        for (int i = 0; i < j; i++) {
            Scheduler poll = this.availableSchedulers.poll();
            if (poll == null) {
                if (j != Long.MAX_VALUE) {
                    LOGGER.atVerbose().addKeyValue(ServiceBusConstants.NUMBER_OF_REQUESTED_MESSAGES_KEY, j).log("There are no available schedulers to fetch.");
                    return;
                }
                return;
            }
            this.sessionReceiveSink.next(getSession(poll, true));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> Mono<Void> validateParameter(T t, String str, String str2) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, str2))) : t == 0 ? FluxUtil.monoError(LOGGER, new NullPointerException(String.format("'%s' cannot be null.", str))) : ((t instanceof String) && ((String) t).isEmpty()) ? FluxUtil.monoError(LOGGER, new IllegalArgumentException(String.format("'%s' cannot be an empty string.", str))) : Mono.empty();
    }
}
