package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.config.TlsConfig;
import io.quarkus.reactivemessaging.http.runtime.serializers.SerializerFactoryBase;
import io.quarkus.tls.TlsConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniRetry;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/WebSocketSink.class */
class WebSocketSink {
    private static final Logger log = Logger.getLogger(WebSocketSink.class);
    private static final String WSS = "wss";
    private static final List<String> supportedSchemes = Arrays.asList("ws", WSS);
    private final URI uri;
    private final HttpClient httpClient;
    private final SubscriberBuilder<Message<?>, Void> subscriber;
    private final boolean ssl;
    private final String serializer;
    private final SerializerFactoryBase serializerFactory;
    private final AtomicReference<WebSocket> websocket = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebSocketSink(Vertx vertx, URI uri, String str, SerializerFactoryBase serializerFactoryBase, int i, Optional<Duration> optional, double d, Optional<TlsConfiguration> optional2) {
        this.uri = uri;
        this.serializerFactory = serializerFactoryBase;
        this.serializer = str;
        String lowerCase = uri.getScheme().toLowerCase(Locale.getDefault());
        if (!supportedSchemes.contains(lowerCase)) {
            throw new IllegalArgumentException("Invalid scheme '" + lowerCase + "' for the websocket sink URL: " + String.valueOf(uri));
        }
        this.ssl = WSS.equals(lowerCase);
        HttpClientOptions httpClientOptions = new HttpClientOptions();
        optional2.ifPresent(tlsConfiguration -> {
            TlsConfig.configure(httpClientOptions, tlsConfiguration);
        });
        this.httpClient = vertx.createHttpClient(httpClientOptions);
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            Uni<Void> send = send(message);
            log.debugf("maxRetries: %d", i);
            if (i > 0) {
                UniRetry retry = send.onFailure().retry();
                if (optional.isPresent()) {
                    retry = retry.withBackOff((Duration) optional.get()).withJitter(d);
                }
                send = retry.atMost(i);
            }
            return send.onItemOrFailure().transformToUni((r6, th) -> {
                return th != null ? Uni.createFrom().completionStage(message.nack(th).thenApply(r6 -> {
                    log.debug("error responding", th);
                    return message;
                })) : Uni.createFrom().completionStage(message.ack().thenApply(r62 -> {
                    log.debug("responded with success", th);
                    return message;
                }));
            }).subscribeAsCompletionStage();
        }).ignore();
    }

    private void connect(WebSocketConnectOptions webSocketConnectOptions, Handler<AsyncResult<WebSocket>> handler) {
        log.debug("using a new web socket connection");
        this.httpClient.webSocket(webSocketConnectOptions, asyncResult -> {
            if (!asyncResult.succeeded()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            WebSocket webSocket = (WebSocket) asyncResult.result();
            WebSocket andSet = this.websocket.getAndSet(webSocket);
            if (andSet != null) {
                log.debug("Closing previous web socket connection");
                andSet.close();
            }
            webSocket.closeHandler(r6 -> {
                log.debug("WebSocket disconnected");
                this.websocket.compareAndSet(webSocket, null);
            });
            handler.handle(asyncResult);
        });
    }

    private Uni<Void> send(Message<?> message) {
        WebSocketConnectOptions options = options();
        Buffer serialize = this.serializerFactory.getSerializer(this.serializer, message.getPayload()).serialize(message.getPayload());
        return AsyncResultUni.toUni(handler -> {
            WebSocket webSocket = this.websocket.get();
            if (webSocket == null || webSocket.isClosed()) {
                connect(options, asyncResult -> {
                    if (asyncResult.succeeded()) {
                        _send((WebSocket) asyncResult.result(), serialize, handler);
                    } else {
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                    }
                });
            } else {
                log.debug("reusing a previous web socket connection");
                _send(webSocket, serialize, handler);
            }
        });
    }

    private WebSocketConnectOptions options() {
        return new WebSocketConnectOptions().setSsl(Boolean.valueOf(this.ssl)).setHost(this.uri.getHost()).setPort(Integer.valueOf(this.uri.getPort())).setURI(this.uri.getPath());
    }

    private void _send(WebSocket webSocket, Buffer buffer, Handler<AsyncResult<Void>> handler) {
        log.debug("sending out the message");
        webSocket.write(buffer, asyncResult -> {
            if (asyncResult.succeeded()) {
                log.debug("success");
            } else {
                log.debug("failure", asyncResult.cause());
            }
            handler.handle(asyncResult);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriberBuilder<Message<?>, Void> sink() {
        return this.subscriber;
    }
}
