package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusProcessorClient.class */
public final class ServiceBusProcessorClient implements AutoCloseable {
    private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusProcessorClient.class);
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final ServiceBusProcessorClientOptions processorOptions;
    private final Map<Subscription, Subscription> receiverSubscriptions;
    private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient;
    private final AtomicBoolean isRunning;
    private final String queueName;
    private final String topicName;
    private final String subscriptionName;
    private final ServiceBusTracer tracer;
    private Disposable monitorDisposable;
    private boolean wasStopped;
    private final NonSessionProcessor nonSessionProcessorV2;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder serviceBusSessionReceiverClientBuilder, String str, String str2, String str3, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, ServiceBusProcessorClientOptions serviceBusProcessorClientOptions) {
        this.receiverSubscriptions = new ConcurrentHashMap();
        this.asyncClient = new AtomicReference<>();
        this.isRunning = new AtomicBoolean();
        this.wasStopped = false;
        this.sessionReceiverBuilder = (ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder) Objects.requireNonNull(serviceBusSessionReceiverClientBuilder, "'sessionReceiverBuilder' cannot be null");
        this.processMessage = (Consumer) Objects.requireNonNull(consumer, "'processMessage' cannot be null");
        this.processError = (Consumer) Objects.requireNonNull(consumer2, "'processError' cannot be null");
        this.processorOptions = (ServiceBusProcessorClientOptions) Objects.requireNonNull(serviceBusProcessorClientOptions, "'processorOptions' cannot be null");
        ServiceBusReceiverAsyncClient buildAsyncClientForProcessor = serviceBusSessionReceiverClientBuilder.buildAsyncClientForProcessor();
        this.asyncClient.set(buildAsyncClientForProcessor);
        this.receiverBuilder = null;
        this.queueName = str;
        this.topicName = str2;
        this.subscriptionName = str3;
        this.tracer = buildAsyncClientForProcessor.getInstrumentation().getTracer();
        this.nonSessionProcessorV2 = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder, String str, String str2, String str3, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, ServiceBusProcessorClientOptions serviceBusProcessorClientOptions) {
        this.receiverSubscriptions = new ConcurrentHashMap();
        this.asyncClient = new AtomicReference<>();
        this.isRunning = new AtomicBoolean();
        this.wasStopped = false;
        this.receiverBuilder = (ServiceBusClientBuilder.ServiceBusReceiverClientBuilder) Objects.requireNonNull(serviceBusReceiverClientBuilder, "'receiverBuilder' cannot be null");
        this.processMessage = (Consumer) Objects.requireNonNull(consumer, "'processMessage' cannot be null");
        this.processError = (Consumer) Objects.requireNonNull(consumer2, "'processError' cannot be null");
        this.processorOptions = (ServiceBusProcessorClientOptions) Objects.requireNonNull(serviceBusProcessorClientOptions, "'processorOptions' cannot be null");
        this.sessionReceiverBuilder = null;
        this.queueName = str;
        this.topicName = str2;
        this.subscriptionName = str3;
        if (this.processorOptions.isNonSessionProcessorV2()) {
            this.nonSessionProcessorV2 = new NonSessionProcessor(serviceBusReceiverClientBuilder, consumer, consumer2, this.processorOptions.getMaxConcurrentCalls(), !this.processorOptions.isDisableAutoComplete());
            this.tracer = null;
        } else {
            this.nonSessionProcessorV2 = null;
            ServiceBusReceiverAsyncClient buildAsyncClient = serviceBusReceiverClientBuilder.buildAsyncClient();
            this.asyncClient.set(buildAsyncClient);
            this.tracer = buildAsyncClient.getInstrumentation().getTracer();
        }
    }

    public synchronized void start() {
        if (isNonSessionProcessorV2()) {
            this.nonSessionProcessorV2.start();
            return;
        }
        if (this.isRunning.getAndSet(true)) {
            LOGGER.info("Processor is already running");
            return;
        }
        if (this.wasStopped) {
            this.wasStopped = false;
            LOGGER.warning("Starting Processor that was stopped before is not recommended, and this feature may be deprecated in the future. Please close this processor instance and create a new one to restart processing. Refer to the GitHub issue https://github.com/Azure/azure-sdk-for-java/issues/34464 for more details");
        }
        if (this.asyncClient.get() == null) {
            this.asyncClient.set(this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient());
        }
        receiveMessages();
        if (this.monitorDisposable == null) {
            this.monitorDisposable = Schedulers.boundedElastic().schedulePeriodically(() -> {
                if (this.asyncClient.get().isConnectionClosed()) {
                    restartMessageReceiver(null);
                }
            }, 10L, 10L, TimeUnit.SECONDS);
        }
    }

    public synchronized void stop() {
        if (isNonSessionProcessorV2()) {
            this.nonSessionProcessorV2.stop();
        } else {
            this.wasStopped = true;
            this.isRunning.set(false);
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (isNonSessionProcessorV2()) {
            this.nonSessionProcessorV2.close();
            return;
        }
        this.isRunning.set(false);
        this.receiverSubscriptions.keySet().forEach((v0) -> {
            v0.cancel();
        });
        this.receiverSubscriptions.clear();
        if (this.monitorDisposable != null) {
            this.monitorDisposable.dispose();
            this.monitorDisposable = null;
        }
        if (this.asyncClient.get() != null) {
            this.asyncClient.get().close();
            this.asyncClient.set(null);
        }
    }

    public synchronized boolean isRunning() {
        return isNonSessionProcessorV2() ? this.nonSessionProcessorV2.isRunning() : this.isRunning.get();
    }

    public String getQueueName() {
        return this.queueName;
    }

    public String getTopicName() {
        return this.topicName;
    }

    public String getSubscriptionName() {
        return this.subscriptionName;
    }

    public synchronized String getIdentifier() {
        if (isNonSessionProcessorV2()) {
            return this.nonSessionProcessorV2.getIdentifier();
        }
        if (this.asyncClient.get() == null) {
            this.asyncClient.set(this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient());
        }
        return this.asyncClient.get().getIdentifier();
    }

    private synchronized void receiveMessages() {
        if (this.receiverSubscriptions.size() > 0) {
            this.receiverSubscriptions.keySet().forEach(subscription -> {
                subscription.request(1L);
            });
            return;
        }
        final ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient = this.asyncClient.get();
        CoreSubscriber[] coreSubscriberArr = new CoreSubscriber[this.processorOptions.getMaxConcurrentCalls()];
        for (int i = 0; i < this.processorOptions.getMaxConcurrentCalls(); i++) {
            coreSubscriberArr[i] = new CoreSubscriber<ServiceBusMessageContext>() { // from class: com.azure.messaging.servicebus.ServiceBusProcessorClient.1
                private Subscription subscription = null;

                public void onSubscribe(Subscription subscription2) {
                    this.subscription = subscription2;
                    ServiceBusProcessorClient.this.receiverSubscriptions.put(subscription2, subscription2);
                    subscription2.request(1L);
                }

                public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
                    try {
                        AutoCloseable makeSpanCurrent = ServiceBusProcessorClient.this.tracer.makeSpanCurrent(serviceBusMessageContext.getMessage().getContext());
                        try {
                            if (serviceBusMessageContext.hasError()) {
                                ServiceBusProcessorClient.this.handleError(serviceBusMessageContext.getThrowable());
                            } else {
                                try {
                                    ServiceBusProcessorClient.this.processMessage.accept(new ServiceBusReceivedMessageContext(serviceBusReceiverAsyncClient, serviceBusMessageContext));
                                } catch (Exception e) {
                                    serviceBusMessageContext.getMessage().setContext(serviceBusMessageContext.getMessage().getContext().addData("process-error", e));
                                    ServiceBusProcessorClient.this.handleError(new ServiceBusException(e, ServiceBusErrorSource.USER_CALLBACK));
                                    if (!ServiceBusProcessorClient.this.processorOptions.isDisableAutoComplete()) {
                                        ServiceBusProcessorClient.LOGGER.warning("Error when processing message. Abandoning message.", new Object[]{e});
                                        ServiceBusProcessorClient.this.abandonMessage(serviceBusMessageContext, serviceBusReceiverAsyncClient);
                                    }
                                }
                            }
                            if (ServiceBusProcessorClient.this.isRunning.get()) {
                                ServiceBusProcessorClient.LOGGER.verbose("Requesting 1 more message from upstream");
                                this.subscription.request(1L);
                            }
                            if (makeSpanCurrent != null) {
                                makeSpanCurrent.close();
                            }
                        } finally {
                        }
                    } catch (Exception e2) {
                        ServiceBusProcessorClient.LOGGER.verbose("Error disposing scope", new Object[]{e2});
                    }
                }

                public void onError(Throwable th) {
                    ServiceBusProcessorClient.LOGGER.info("Error receiving messages.", new Object[]{th});
                    ServiceBusProcessorClient.this.handleError(th);
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }

                public void onComplete() {
                    ServiceBusProcessorClient.LOGGER.info("Completed receiving messages.");
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }
            };
        }
        if (this.processorOptions.getMaxConcurrentCalls() > 1) {
            serviceBusReceiverAsyncClient.receiveMessagesWithContext().parallel(this.processorOptions.getMaxConcurrentCalls(), 1).runOn(Schedulers.boundedElastic(), 1).subscribe(coreSubscriberArr);
        } else {
            serviceBusReceiverAsyncClient.receiveMessagesWithContext().subscribe(coreSubscriberArr[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient) {
        try {
            serviceBusReceiverAsyncClient.abandon(serviceBusMessageContext.getMessage()).block();
        } catch (Exception e) {
            LOGGER.verbose("Failed to abandon message", new Object[]{e});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th) {
        try {
            ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient = this.asyncClient.get();
            this.processError.accept(new ServiceBusErrorContext(th, serviceBusReceiverAsyncClient.getFullyQualifiedNamespace(), serviceBusReceiverAsyncClient.getEntityPath()));
        } catch (Exception e) {
            LOGGER.verbose("Error from error handler. Ignoring error.", new Object[]{e});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void restartMessageReceiver(Subscription subscription) {
        if (isRunning()) {
            if (subscription == null || this.receiverSubscriptions.containsKey(subscription)) {
                this.receiverSubscriptions.keySet().forEach((v0) -> {
                    v0.cancel();
                });
                this.receiverSubscriptions.clear();
                this.asyncClient.get().close();
                this.asyncClient.set(this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient());
                receiveMessages();
            }
        }
    }

    private boolean isNonSessionProcessorV2() {
        return this.processorOptions.isNonSessionProcessorV2();
    }
}
