package com.azure.messaging.servicebus;

import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/NonSessionMessagePump.class */
final class NonSessionMessagePump {
    private static final AtomicLong COUNTER = new AtomicLong();
    private static final Duration CONNECTION_STATE_POLL_INTERVAL = Duration.ofSeconds(20);
    private final long pumpId = COUNTER.incrementAndGet();
    private final ServiceBusReceiverAsyncClient client;
    private final String fqdn;
    private final String entityPath;
    private final ClientLogger logger;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final int concurrency;
    private final boolean enableAutoDisposition;
    private final boolean enableAutoLockRenew;
    private final Scheduler workerScheduler;
    private final ServiceBusReceiverInstrumentation instrumentation;
    private final ServiceBusTracer tracer;

    /* loaded from: input_file:com/azure/messaging/servicebus/NonSessionMessagePump$RunOnWorker.class */
    private static final class RunOnWorker implements Function<ServiceBusReceivedMessage, Publisher<Void>> {
        private final Consumer<ServiceBusReceivedMessage> handleMessage;
        private final Scheduler workerScheduler;

        RunOnWorker(Consumer<ServiceBusReceivedMessage> consumer, Scheduler scheduler) {
            this.handleMessage = consumer;
            this.workerScheduler = scheduler;
        }

        @Override // java.util.function.Function
        public Mono<Void> apply(ServiceBusReceivedMessage serviceBusReceivedMessage) {
            return Mono.fromRunnable(() -> {
                this.handleMessage.accept(serviceBusReceivedMessage);
            }).subscribeOn(this.workerScheduler).then();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NonSessionMessagePump(ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, int i, boolean z) {
        this.client = serviceBusReceiverAsyncClient;
        this.fqdn = this.client.getFullyQualifiedNamespace();
        this.entityPath = this.client.getEntityPath();
        HashMap hashMap = new HashMap(3);
        hashMap.put("pump-id", Long.valueOf(this.pumpId));
        hashMap.put("namespace", this.fqdn);
        hashMap.put("entityPath", this.entityPath);
        this.logger = new ClientLogger(NonSessionMessagePump.class, hashMap);
        this.processMessage = consumer;
        this.processError = consumer2;
        this.concurrency = i;
        this.enableAutoDisposition = z;
        this.enableAutoLockRenew = serviceBusReceiverAsyncClient.isAutoLockRenewRequested();
        if (i > 1) {
            this.workerScheduler = Schedulers.boundedElastic();
        } else {
            this.workerScheduler = Schedulers.immediate();
        }
        this.instrumentation = this.client.getInstrumentation();
        this.tracer = this.instrumentation.getTracer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> begin() {
        return Mono.firstWithSignal(new Mono[]{this.client.nonSessionProcessorReceiveV2().flatMap(new RunOnWorker(this::handleMessage, this.workerScheduler), this.concurrency, 1).then(), pollConnectionState()}).onErrorMap(th -> {
            return th instanceof MessagePumpTerminatedException ? th : new MessagePumpTerminatedException(this.pumpId, this.fqdn, this.entityPath, "pumping#error-map", th);
        }).then(Mono.error(() -> {
            return MessagePumpTerminatedException.forCompletion(this.pumpId, this.fqdn, this.entityPath);
        }));
    }

    private Mono<Void> pollConnectionState() {
        return Flux.interval(CONNECTION_STATE_POLL_INTERVAL).handle((l, synchronousSink) -> {
            if (this.client.isConnectionClosed()) {
                synchronousSink.error(this.logger.atInfo().log(new MessagePumpTerminatedException(this.pumpId, this.fqdn, this.entityPath, "non-session#connection-state-poll")));
            }
        }).then();
    }

    private void handleMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        Disposable beginLockRenewal = this.enableAutoLockRenew ? this.client.beginLockRenewal(serviceBusReceivedMessage) : Disposables.disposed();
        boolean notifyMessage = notifyMessage(serviceBusReceivedMessage);
        if (this.enableAutoDisposition) {
            if (notifyMessage) {
                complete(serviceBusReceivedMessage);
            } else {
                abandon(serviceBusReceivedMessage);
            }
        }
        beginLockRenewal.dispose();
    }

    private boolean notifyMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        AutoCloseable makeSpanCurrent = this.tracer.makeSpanCurrent(this.instrumentation.instrumentProcess("ServiceBus.process", serviceBusReceivedMessage, Context.NONE));
        Exception exc = null;
        try {
            this.processMessage.accept(new ServiceBusReceivedMessageContext(this.client, new ServiceBusMessageContext(serviceBusReceivedMessage)));
        } catch (Exception e) {
            exc = e;
        }
        if (exc == null) {
            this.tracer.endSpan(null, serviceBusReceivedMessage.getContext(), makeSpanCurrent);
            return true;
        }
        notifyError(new ServiceBusException(exc, ServiceBusErrorSource.USER_CALLBACK));
        this.tracer.endSpan(exc, serviceBusReceivedMessage.getContext(), makeSpanCurrent);
        return false;
    }

    private void notifyError(Throwable th) {
        try {
            this.processError.accept(new ServiceBusErrorContext(th, this.fqdn, this.entityPath));
        } catch (Exception e) {
            this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
        }
    }

    private void complete(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        try {
            this.client.complete(serviceBusReceivedMessage).block();
        } catch (Exception e) {
            this.logger.atVerbose().log("Failed to complete message", new Object[]{e});
        }
    }

    private void abandon(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        try {
            this.client.abandon(serviceBusReceivedMessage).block();
        } catch (Exception e) {
            this.logger.atVerbose().log("Failed to abandon message", new Object[]{e});
        }
    }
}
