package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/messaging/servicebus/UnnamedSessionManager.class */
class UnnamedSessionManager implements AutoCloseable {
    private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration.ofMinutes(1);
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final Duration operationTimeout;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final List<Scheduler> schedulers;
    private final EmitterProcessor<Flux<ServiceBusReceivedMessageContext>> processor;
    private final FluxSink<Flux<ServiceBusReceivedMessageContext>> sessionReceiveSink;
    private volatile Flux<ServiceBusReceivedMessageContext> receiveFlux;
    private final ClientLogger logger = new ClientLogger(UnnamedSessionManager.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final Deque<Scheduler> availableSchedulers = new ConcurrentLinkedDeque();
    private final ConcurrentHashMap<String, UnnamedSessionReceiver> sessionReceivers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnnamedSessionManager(String str, MessagingEntityType messagingEntityType, ServiceBusConnectionProcessor serviceBusConnectionProcessor, Duration duration, TracerProvider tracerProvider, MessageSerializer messageSerializer, ReceiverOptions receiverOptions) {
        this.entityPath = str;
        this.entityType = messagingEntityType;
        this.receiverOptions = receiverOptions;
        this.connectionProcessor = serviceBusConnectionProcessor;
        this.operationTimeout = duration;
        this.tracerProvider = tracerProvider;
        this.messageSerializer = messageSerializer;
        int intValue = receiverOptions.isRollingSessionReceiver() ? receiverOptions.getMaxConcurrentSessions().intValue() : 1;
        this.schedulers = Collections.unmodifiableList((List) IntStream.range(0, intValue).mapToObj(i -> {
            return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "receiver-" + i);
        }).collect(Collectors.toList()));
        this.availableSchedulers.addAll(this.schedulers);
        this.processor = EmitterProcessor.create(intValue, false);
        this.sessionReceiveSink = this.processor.sink();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLinkName(String str) {
        UnnamedSessionReceiver unnamedSessionReceiver = this.sessionReceivers.get(str);
        if (unnamedSessionReceiver != null) {
            return unnamedSessionReceiver.getLinkName();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<byte[]> getSessionState(String str) {
        return validateParameter(str, "sessionId", "getSessionState").then(getManagementNode().flatMap(serviceBusManagementNode -> {
            UnnamedSessionReceiver unnamedSessionReceiver = this.sessionReceivers.get(str);
            return serviceBusManagementNode.getSessionState(str, unnamedSessionReceiver != null ? unnamedSessionReceiver.getLinkName() : null);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessageContext> receive() {
        if (!this.isStarted.getAndSet(true)) {
            this.sessionReceiveSink.onRequest(this::onSessionRequest);
            if (this.receiverOptions.isRollingSessionReceiver()) {
                this.receiveFlux = Flux.merge(this.processor, this.receiverOptions.getMaxConcurrentSessions().intValue());
            } else {
                this.receiveFlux = getSession(this.schedulers.get(0), false);
            }
        }
        return this.receiveFlux;
    }

    Mono<OffsetDateTime> renewSessionLock(String str) {
        return validateParameter(str, "sessionId", "renewSessionLock").then(getManagementNode().flatMap(serviceBusManagementNode -> {
            UnnamedSessionReceiver unnamedSessionReceiver = this.sessionReceivers.get(str);
            return serviceBusManagementNode.renewSessionLock(str, unnamedSessionReceiver != null ? unnamedSessionReceiver.getLinkName() : null).handle((instant, synchronousSink) -> {
                if (unnamedSessionReceiver != null) {
                    unnamedSessionReceiver.setSessionLockedUntil(instant.atOffset(ZoneOffset.UTC));
                }
                synchronousSink.next(instant.atOffset(ZoneOffset.UTC));
            });
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Boolean> updateDisposition(String str, String str2, DispositionStatus dispositionStatus, Map<String, Object> map, String str3, String str4, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Mono.when(new Publisher[]{validateParameter(str, "lockToken", "updateDisposition"), validateParameter(str, "lockToken", "updateDisposition"), validateParameter(str2, "'sessionId'", "updateDisposition")}).then(Mono.defer(() -> {
            UnnamedSessionReceiver unnamedSessionReceiver = this.sessionReceivers.get(str2);
            return (unnamedSessionReceiver == null || !unnamedSessionReceiver.containsLockToken(str)) ? Mono.just(false) : unnamedSessionReceiver.updateDisposition(str, MessageUtils.getDeliveryState(dispositionStatus, str3, str4, map, serviceBusTransactionContext)).thenReturn(true);
        }));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        Iterator<Scheduler> it = this.schedulers.iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
        this.sessionReceivers.values().forEach(unnamedSessionReceiver -> {
            unnamedSessionReceiver.close();
        });
        this.sessionReceiveSink.complete();
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.connectionProcessor.getFullyQualifiedNamespace(), this.entityPath);
    }

    private Mono<ServiceBusReceiveLink> createSessionReceiveLink() {
        String randomString = StringUtil.getRandomString("session-");
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createReceiveLink(randomString, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, null);
        });
    }

    private Mono<ServiceBusReceiveLink> getActiveLink() {
        return Mono.defer(() -> {
            return createSessionReceiveLink().flatMap(serviceBusReceiveLink -> {
                return serviceBusReceiveLink.getEndpointStates().takeUntil(amqpEndpointState -> {
                    return amqpEndpointState == AmqpEndpointState.ACTIVE;
                }).timeout(this.operationTimeout).then(Mono.just(serviceBusReceiveLink));
            });
        }).retryWhen(Retry.from(flux -> {
            return flux.flatMap(retrySignal -> {
                AmqpException failure = retrySignal.failure();
                this.logger.info("entityPath[{}] attempt[{}]. Error occurred while getting unnamed session.", new Object[]{this.entityPath, Long.valueOf(retrySignal.totalRetriesInARow()), failure});
                return this.isDisposed.get() ? Mono.error(new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())) : failure instanceof TimeoutException ? Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION) : ((failure instanceof AmqpException) && failure.getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) ? Mono.delay(SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION) : Mono.error(failure);
            });
        }));
    }

    private Flux<ServiceBusReceivedMessageContext> getSession(Scheduler scheduler, boolean z) {
        return getActiveLink().flatMap(serviceBusReceiveLink -> {
            return serviceBusReceiveLink.getSessionId().map(str -> {
                return this.sessionReceivers.compute(str, (str, unnamedSessionReceiver) -> {
                    return unnamedSessionReceiver != null ? unnamedSessionReceiver : new UnnamedSessionReceiver(serviceBusReceiveLink, this.messageSerializer, this.connectionProcessor.getRetryOptions(), this.receiverOptions.getPrefetchCount(), z, scheduler, this::renewSessionLock);
                });
            });
        }).flatMapMany(unnamedSessionReceiver -> {
            return unnamedSessionReceiver.receive().doFinally(signalType -> {
                this.logger.verbose("Adding scheduler back to pool.");
                this.availableSchedulers.push(scheduler);
                if (this.receiverOptions.isRollingSessionReceiver()) {
                    onSessionRequest(1L);
                }
            });
        }).publishOn(scheduler);
    }

    private Mono<ServiceBusManagementNode> getManagementNode() {
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        });
    }

    private void onSessionRequest(long j) {
        if (this.isDisposed.get()) {
            this.logger.info("Session manager is disposed. Not emitting more unnamed sessions.");
            return;
        }
        this.logger.verbose("Requested {} unnamed sessions.", new Object[]{Long.valueOf(j)});
        for (int i = 0; i < j; i++) {
            Scheduler poll = this.availableSchedulers.poll();
            if (poll == null) {
                if (j != Long.MAX_VALUE) {
                    this.logger.verbose("request[{}]: There are no available schedulers to fetch.", new Object[]{Long.valueOf(j)});
                    return;
                }
                return;
            }
            this.sessionReceiveSink.next(getSession(poll, true));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> Mono<Void> validateParameter(T t, String str, String str2) {
        return this.isDisposed.get() ? FluxUtil.monoError(this.logger, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, str2))) : t == 0 ? FluxUtil.monoError(this.logger, new NullPointerException(String.format("'%s' cannot be null.", str))) : ((t instanceof String) && ((String) t).isEmpty()) ? FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format("'%s' cannot be an empty string.", str))) : Mono.empty();
    }
}
