package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.runtime.configuration.DurationConverter;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.jboss.logging.Logger;

@ApplicationScoped
@ConnectorAttributes({@ConnectorAttribute(name = "url", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "The target URL", mandatory = true), @ConnectorAttribute(name = "serializer", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Message serializer"), @ConnectorAttribute(name = "maxPoolSize", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "Maximum pool size for connections"), @ConnectorAttribute(name = "maxWaitQueueSize", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "Maximum requests allowed in the wait queue of the underlying client.  If the value is set to a negative number then the queue will be unbounded"), @ConnectorAttribute(name = "maxRetries", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero", defaultValue = QuarkusHttpConnector.DEFAULT_MAX_ATTEMPTS_STR), @ConnectorAttribute(name = "jitter", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configures the random factor when using back-off with maxRetries > 0", defaultValue = QuarkusHttpConnector.DEFAULT_JITTER), @ConnectorAttribute(name = "delay", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "Configures a back-off delay between attempts to send a request. A random factor (jitter) is applied to increase the delay when several failures happen."), @ConnectorAttribute(name = "method", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The HTTP method (either `POST` or `PUT`)", defaultValue = "POST"), @ConnectorAttribute(name = "path", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The path of the endpoint", mandatory = true), @ConnectorAttribute(name = "buffer-size", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.", defaultValue = QuarkusHttpConnector.DEFAULT_SOURCE_BUFFER_STR), @ConnectorAttribute(name = "broadcast", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the messages should be dispatched to multiple consumers", defaultValue = "false")})
@Connector(QuarkusHttpConnector.NAME)
/* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/QuarkusHttpConnector.class */
public class QuarkusHttpConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    static final String DEFAULT_JITTER = "0.5";
    static final String DEFAULT_MAX_ATTEMPTS_STR = "0";
    public static final String NAME = "quarkus-http";

    @Inject
    ReactiveHttpHandlerBean handlerBean;

    @Inject
    Vertx vertx;

    @Inject
    SerializerFactoryBase serializerFactory;
    private static final Logger log = Logger.getLogger(QuarkusHttpConnector.class);
    static final String DEFAULT_SOURCE_BUFFER_STR = "8";
    public static final Integer DEFAULT_SOURCE_BUFFER = Integer.valueOf(DEFAULT_SOURCE_BUFFER_STR);

    public PublisherBuilder<HttpMessage<?>> getPublisherBuilder(Config config) {
        QuarkusHttpConnectorIncomingConfiguration quarkusHttpConnectorIncomingConfiguration = new QuarkusHttpConnectorIncomingConfiguration(config);
        Multi<HttpMessage<?>> processor = this.handlerBean.getProcessor(quarkusHttpConnectorIncomingConfiguration.getPath(), getMethod(quarkusHttpConnectorIncomingConfiguration.getMethod()));
        return quarkusHttpConnectorIncomingConfiguration.getBroadcast().booleanValue() ? ReactiveStreams.fromPublisher(processor.broadcast().toAllSubscribers()) : ReactiveStreams.fromPublisher(processor);
    }

    private HttpMethod getMethod(String str) {
        try {
            return HttpMethod.valueOf(str);
        } catch (IllegalArgumentException e) {
            String str2 = "Unsupported HTTP method: " + str + ". The supported methods are: " + HttpMethod.values();
            log.warn(str2, e);
            throw new IllegalArgumentException(str2);
        }
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        QuarkusHttpConnectorOutgoingConfiguration quarkusHttpConnectorOutgoingConfiguration = new QuarkusHttpConnectorOutgoingConfiguration(config);
        String url = quarkusHttpConnectorOutgoingConfiguration.getUrl();
        String name = getMethod(quarkusHttpConnectorOutgoingConfiguration.getMethod()).name();
        String orElse = quarkusHttpConnectorOutgoingConfiguration.getSerializer().orElse(null);
        Optional<U> map = quarkusHttpConnectorOutgoingConfiguration.getDelay().map(DurationConverter::parseDuration);
        String jitter = quarkusHttpConnectorOutgoingConfiguration.getJitter();
        Integer maxRetries = quarkusHttpConnectorOutgoingConfiguration.getMaxRetries();
        try {
            return new HttpSink(this.vertx, name, url, orElse, maxRetries.intValue(), Double.valueOf(jitter).doubleValue(), map, quarkusHttpConnectorOutgoingConfiguration.getMaxPoolSize(), quarkusHttpConnectorOutgoingConfiguration.getMaxWaitQueueSize(), this.serializerFactory).sink();
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException(String.format("Failed to parse jitter value '%s' to a double.", jitter));
        }
    }
}
