package com.netflix.graphql.dgs.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.kotlin.ExtensionsKt;
import com.netflix.graphql.types.subscription.OperationMessage;
import graphql.GraphQLException;
import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

/* compiled from: WebSocketGraphQLClient.kt */
@Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��R\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\u0018�� \u001c2\u00020\u0001:\u0001\u001cB\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\f\u0010\u000e\u001a\b\u0012\u0004\u0012\u00020\u00100\u000fJ\u0018\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\fH\u0002J\u0010\u0010\u0016\u001a\u00020\f2\u0006\u0010\u0015\u001a\u00020\u0012H\u0002J\u0016\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00100\u000f2\u0006\u0010\u0013\u001a\u00020\u0014H\u0002J\f\u0010\u0018\u001a\b\u0012\u0004\u0012\u00020\f0\u0019J\u000e\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u0015\u001a\u00020\fR\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R2\u0010\u0007\u001a&\u0012\f\u0012\n \n*\u0004\u0018\u00010\t0\t \n*\u0012\u0012\f\u0012\n \n*\u0004\u0018\u00010\t0\t\u0018\u00010\b0\bX\u0082\u0004¢\u0006\u0002\n��R2\u0010\u000b\u001a&\u0012\f\u0012\n \n*\u0004\u0018\u00010\f0\f \n*\u0012\u0012\f\u0012\n \n*\u0004\u0018\u00010\f0\f\u0018\u00010\b0\bX\u0082\u0004¢\u0006\u0002\n��R2\u0010\r\u001a&\u0012\f\u0012\n \n*\u0004\u0018\u00010\f0\f \n*\u0012\u0012\f\u0012\n \n*\u0004\u0018\u00010\f0\f\u0018\u00010\b0\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u001d"}, d2 = {"Lcom/netflix/graphql/dgs/client/OperationMessageWebSocketClient;", "", "url", "", "client", "Lorg/springframework/web/reactive/socket/client/WebSocketClient;", "(Ljava/lang/String;Lorg/springframework/web/reactive/socket/client/WebSocketClient;)V", "errorSink", "Lreactor/core/publisher/Sinks$Many;", "Lgraphql/GraphQLException;", "kotlin.jvm.PlatformType", "incomingSink", "Lcom/netflix/graphql/types/subscription/OperationMessage;", "outgoingSink", "connect", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "createMessage", "Lorg/springframework/web/reactive/socket/WebSocketMessage;", "session", "Lorg/springframework/web/reactive/socket/WebSocketSession;", "message", "decodeMessage", "exchange", "receive", "Lreactor/core/publisher/Flux;", "send", "", "Companion", "graphql-dgs-client"})
/* loaded from: input_file:com/netflix/graphql/dgs/client/OperationMessageWebSocketClient.class */
public final class OperationMessageWebSocketClient {

    @NotNull
    private final String url;

    @NotNull
    private final WebSocketClient client;
    private final Sinks.Many<OperationMessage> incomingSink;
    private final Sinks.Many<OperationMessage> outgoingSink;
    private final Sinks.Many<GraphQLException> errorSink;

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private static final ObjectMapper MAPPER = ExtensionsKt.jacksonObjectMapper();

    /* compiled from: WebSocketGraphQLClient.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lcom/netflix/graphql/dgs/client/OperationMessageWebSocketClient$Companion;", "", "()V", "MAPPER", "Lcom/fasterxml/jackson/databind/ObjectMapper;", "graphql-dgs-client"})
    /* loaded from: input_file:com/netflix/graphql/dgs/client/OperationMessageWebSocketClient$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public OperationMessageWebSocketClient(@NotNull String str, @NotNull WebSocketClient webSocketClient) {
        Intrinsics.checkNotNullParameter(str, "url");
        Intrinsics.checkNotNullParameter(webSocketClient, "client");
        this.url = str;
        this.client = webSocketClient;
        this.incomingSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.outgoingSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
        this.errorSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    }

    @NotNull
    public final Mono<Void> connect() {
        Mono<Void> defer = Mono.defer(() -> {
            return m4connect$lambda0(r0);
        });
        Intrinsics.checkNotNullExpressionValue(defer, "defer { client.execute(URI(url), this::exchange) }");
        return defer;
    }

    public final void send(@NotNull OperationMessage operationMessage) {
        Intrinsics.checkNotNullParameter(operationMessage, "message");
        this.outgoingSink.tryEmitNext(operationMessage).orThrow();
    }

    @NotNull
    public final Flux<OperationMessage> receive() {
        Flux<OperationMessage> mergeWith = this.incomingSink.asFlux().mergeWith(this.errorSink.asFlux().map(new Function() { // from class: com.netflix.graphql.dgs.client.OperationMessageWebSocketClient$receive$1
            @Override // java.util.function.Function
            public final Void apply(GraphQLException graphQLException) {
                Intrinsics.checkNotNullExpressionValue(graphQLException, "it");
                throw graphQLException;
            }
        }));
        Intrinsics.checkNotNullExpressionValue(mergeWith, "incomingSink\n            .asFlux()\n            .mergeWith(errorSink.asFlux().map { throw it })");
        return mergeWith;
    }

    private final Mono<Void> exchange(WebSocketSession webSocketSession) {
        Flux map = webSocketSession.receive().map(this::decodeMessage);
        final OperationMessageWebSocketClient$exchange$incomingDeserialized$2 operationMessageWebSocketClient$exchange$incomingDeserialized$2 = new OperationMessageWebSocketClient$exchange$incomingDeserialized$2(this.incomingSink);
        Mono<Void> doAfterTerminate = Flux.merge(new Publisher[]{map.doOnNext(new Consumer() { // from class: com.netflix.graphql.dgs.client.OperationMessageWebSocketClient$sam$java_util_function_Consumer$0
            @Override // java.util.function.Consumer
            public final /* synthetic */ void accept(Object obj) {
                operationMessageWebSocketClient$exchange$incomingDeserialized$2.invoke(obj);
            }
        }), webSocketSession.send(this.outgoingSink.asFlux().map((v2) -> {
            return m5exchange$lambda1(r2, r3, v2);
        }))}).then().doOnError((v1) -> {
            m6exchange$lambda2(r1, v1);
        }).doAfterTerminate(() -> {
            m7exchange$lambda3(r1);
        });
        Intrinsics.checkNotNullExpressionValue(doAfterTerminate, "merge(incomingDeserialized, outgoingSerialized)\n            .then()\n            // Ensure the output flux collapses neatly if an error occurs\n            .doOnError { errorSink.tryEmitNext(GraphQLException(it)).orThrow() }\n            .doAfterTerminate {\n                errorSink.tryEmitNext(GraphQLException(\"Server closed the connection unexpectedly\")).orThrow()\n            }");
        return doAfterTerminate;
    }

    private final WebSocketMessage createMessage(WebSocketSession webSocketSession, OperationMessage operationMessage) {
        WebSocketMessage textMessage = webSocketSession.textMessage(MAPPER.writeValueAsString(operationMessage));
        Intrinsics.checkNotNullExpressionValue(textMessage, "session.textMessage(MAPPER.writeValueAsString(message))");
        return textMessage;
    }

    private final OperationMessage decodeMessage(WebSocketMessage webSocketMessage) {
        Object readValue = MAPPER.readValue(webSocketMessage.getPayloadAsText(), new TypeReference<OperationMessage>() { // from class: com.netflix.graphql.dgs.client.OperationMessageWebSocketClient$decodeMessage$type$1
        });
        Intrinsics.checkNotNullExpressionValue(readValue, "MAPPER.readValue(messageText, type)");
        return (OperationMessage) readValue;
    }

    /* renamed from: connect$lambda-0, reason: not valid java name */
    private static final Mono m4connect$lambda0(OperationMessageWebSocketClient operationMessageWebSocketClient) {
        Intrinsics.checkNotNullParameter(operationMessageWebSocketClient, "this$0");
        return operationMessageWebSocketClient.client.execute(new URI(operationMessageWebSocketClient.url), operationMessageWebSocketClient::exchange);
    }

    /* renamed from: exchange$lambda-1, reason: not valid java name */
    private static final WebSocketMessage m5exchange$lambda1(OperationMessageWebSocketClient operationMessageWebSocketClient, WebSocketSession webSocketSession, OperationMessage operationMessage) {
        Intrinsics.checkNotNullParameter(operationMessageWebSocketClient, "this$0");
        Intrinsics.checkNotNullParameter(webSocketSession, "$session");
        Intrinsics.checkNotNullExpressionValue(operationMessage, "it");
        return operationMessageWebSocketClient.createMessage(webSocketSession, operationMessage);
    }

    /* renamed from: exchange$lambda-2, reason: not valid java name */
    private static final void m6exchange$lambda2(OperationMessageWebSocketClient operationMessageWebSocketClient, Throwable th) {
        Intrinsics.checkNotNullParameter(operationMessageWebSocketClient, "this$0");
        operationMessageWebSocketClient.errorSink.tryEmitNext(new GraphQLException(th)).orThrow();
    }

    /* renamed from: exchange$lambda-3, reason: not valid java name */
    private static final void m7exchange$lambda3(OperationMessageWebSocketClient operationMessageWebSocketClient) {
        Intrinsics.checkNotNullParameter(operationMessageWebSocketClient, "this$0");
        operationMessageWebSocketClient.errorSink.tryEmitNext(new GraphQLException("Server closed the connection unexpectedly")).orThrow();
    }
}
