package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase;
import io.quarkus.reactivemessaging.http.runtime.config.HttpStreamConfig;
import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;
import java.util.Collection;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.jboss.logging.Logger;

@Singleton
/* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/ReactiveHttpHandlerBean.class */
public class ReactiveHttpHandlerBean extends ReactiveHandlerBeanBase<HttpStreamConfig, HttpMessage<?>> {
    private static final Logger log = Logger.getLogger(ReactiveHttpHandlerBean.class);

    @Inject
    ReactiveHttpConfig config;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Multi<HttpMessage<?>> getProcessor(String str, HttpMethod httpMethod) {
        return ((ReactiveHandlerBeanBase.Bundle) this.processors.get(key(str, httpMethod))).getProcessor();
    }

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected Collection<HttpStreamConfig> configs() {
        return this.config.getHttpConfigs();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    public String key(HttpStreamConfig httpStreamConfig) {
        return key(httpStreamConfig.path, httpStreamConfig.method);
    }

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected String key(RoutingContext routingContext) {
        return key(routingContext.currentRoute().getPath(), routingContext.request().method());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    public String description(HttpStreamConfig httpStreamConfig) {
        return String.format("path: %s, method %s", httpStreamConfig.path, httpStreamConfig.method);
    }

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected void handleRequest(RoutingContext routingContext, MultiEmitter<? super HttpMessage<?>> multiEmitter, StrictQueueSizeGuard strictQueueSizeGuard, String str) {
        if (multiEmitter == null) {
            onUnexpectedError(routingContext, null, "No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + str);
            return;
        }
        if (!strictQueueSizeGuard.prepareToEmit()) {
            routingContext.response().setStatusCode(503).end();
            return;
        }
        try {
            multiEmitter.emit(new HttpMessage(routingContext.getBody(), new IncomingHttpMetadata(routingContext), () -> {
                if (routingContext.response().ended()) {
                    return;
                }
                routingContext.response().setStatusCode(202).end();
            }, th -> {
                onUnexpectedError(routingContext, th, "Failed to process message.");
            }));
        } catch (Exception e) {
            strictQueueSizeGuard.dequeue();
            onUnexpectedError(routingContext, e, "Emitting message failed");
        }
    }

    private void onUnexpectedError(RoutingContext routingContext, Throwable th, String str) {
        if (routingContext.response().ended()) {
            return;
        }
        routingContext.response().setStatusCode(500).end("Unexpected error while processing the message");
        log.error(str, th);
    }

    private String key(String str, HttpMethod httpMethod) {
        return String.format("%s:%s", str, httpMethod);
    }
}
