package io.vertx.ext.web.handler.graphql.impl;

import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.graphql.ApolloWSHandler;
import io.vertx.ext.web.handler.graphql.ApolloWSMessage;
import io.vertx.ext.web.handler.graphql.ApolloWSMessageType;
import io.vertx.ext.web.handler.graphql.ApolloWSOptions;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.dataloader.DataLoaderRegistry;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/web/handler/graphql/impl/ApolloWSHandlerImpl.class */
public class ApolloWSHandlerImpl implements ApolloWSHandler {
    private static final Function<ApolloWSMessage, Object> DEFAULT_QUERY_CONTEXT_FACTORY = apolloWSMessage -> {
        return apolloWSMessage;
    };
    private static final Function<ApolloWSMessage, DataLoaderRegistry> DEFAULT_DATA_LOADER_REGISTRY_FACTORY = apolloWSMessage -> {
        return null;
    };
    private static final String HEADER_CONNECTION_UPGRADE_VALUE = "upgrade";
    private final GraphQL graphQL;
    private final long keepAlive;
    private Function<ApolloWSMessage, Object> queryContextFactory = DEFAULT_QUERY_CONTEXT_FACTORY;
    private Function<ApolloWSMessage, DataLoaderRegistry> dataLoaderRegistryFactory = DEFAULT_DATA_LOADER_REGISTRY_FACTORY;
    private Handler<ServerWebSocket> connectionHandler;
    private Handler<ServerWebSocket> endHandler;
    private Handler<ApolloWSMessage> messageHandler;

    public ApolloWSHandlerImpl(GraphQL graphQL, ApolloWSOptions apolloWSOptions) {
        Objects.requireNonNull(graphQL, "graphQL");
        Objects.requireNonNull(apolloWSOptions, "options");
        this.graphQL = graphQL;
        this.keepAlive = apolloWSOptions.getKeepAlive();
    }

    @Override // io.vertx.ext.web.handler.graphql.ApolloWSHandler
    public synchronized ApolloWSHandler connectionHandler(Handler<ServerWebSocket> handler) {
        this.connectionHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.web.handler.graphql.ApolloWSHandler
    public synchronized ApolloWSHandler messageHandler(Handler<ApolloWSMessage> handler) {
        this.messageHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.web.handler.graphql.ApolloWSHandler
    public synchronized ApolloWSHandler endHandler(Handler<ServerWebSocket> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override // io.vertx.ext.web.handler.graphql.ApolloWSHandler
    public synchronized ApolloWSHandler queryContext(Function<ApolloWSMessage, Object> function) {
        this.queryContextFactory = function != null ? function : DEFAULT_QUERY_CONTEXT_FACTORY;
        return this;
    }

    @Override // io.vertx.ext.web.handler.graphql.ApolloWSHandler
    public synchronized ApolloWSHandler dataLoaderRegistry(Function<ApolloWSMessage, DataLoaderRegistry> function) {
        this.dataLoaderRegistryFactory = function != null ? function : DEFAULT_DATA_LOADER_REGISTRY_FACTORY;
        return this;
    }

    public void handle(RoutingContext routingContext) {
        MultiMap headers = routingContext.request().headers();
        if (!headers.contains(HttpHeaders.CONNECTION) || !headers.contains(HttpHeaders.UPGRADE, HttpHeaders.WEBSOCKET, true)) {
            routingContext.next();
        } else {
            handleConnection(routingContext.vertx(), routingContext.request().upgrade());
        }
    }

    private void handleConnection(Vertx vertx, ServerWebSocket serverWebSocket) {
        Handler<ServerWebSocket> handler;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        synchronized (this) {
            handler = this.connectionHandler;
        }
        if (handler != null) {
            handler.handle(serverWebSocket);
        }
        serverWebSocket.handler(buffer -> {
            Handler<ApolloWSMessage> handler2;
            JsonObject jsonObject = buffer.toJsonObject();
            String string = jsonObject.getString("id");
            ApolloWSMessageType from = ApolloWSMessageType.from(jsonObject.getString("type"));
            if (from == null) {
                sendMessage(serverWebSocket, string, ApolloWSMessageType.ERROR, "Unknown message type: " + jsonObject.getString("type"));
                return;
            }
            ApolloWSMessageImpl apolloWSMessageImpl = new ApolloWSMessageImpl(serverWebSocket, from, jsonObject);
            synchronized (this) {
                handler2 = this.messageHandler;
            }
            if (handler2 != null) {
                handler2.handle(apolloWSMessageImpl);
            }
            switch (from) {
                case CONNECTION_INIT:
                    connect(vertx, serverWebSocket);
                    return;
                case CONNECTION_TERMINATE:
                    serverWebSocket.close();
                    return;
                case START:
                    start(serverWebSocket, concurrentHashMap, apolloWSMessageImpl);
                    return;
                case STOP:
                    stop(serverWebSocket, concurrentHashMap, string);
                    return;
                default:
                    sendMessage(serverWebSocket, string, ApolloWSMessageType.ERROR, "Unsupported message type: " + from);
                    return;
            }
        });
        serverWebSocket.endHandler(r6 -> {
            Handler<ServerWebSocket> handler2;
            concurrentHashMap.values().forEach((v0) -> {
                v0.cancel();
            });
            synchronized (this) {
                handler2 = this.endHandler;
            }
            if (handler2 != null) {
                handler2.handle(serverWebSocket);
            }
        });
    }

    private void connect(Vertx vertx, ServerWebSocket serverWebSocket) {
        sendMessage(serverWebSocket, null, ApolloWSMessageType.CONNECTION_ACK, null);
        if (this.keepAlive > 0) {
            sendMessage(serverWebSocket, null, ApolloWSMessageType.CONNECTION_KEEP_ALIVE, null);
            vertx.setPeriodic(this.keepAlive, l -> {
                if (serverWebSocket.isClosed()) {
                    vertx.cancelTimer(l.longValue());
                } else {
                    sendMessage(serverWebSocket, null, ApolloWSMessageType.CONNECTION_KEEP_ALIVE, null);
                }
            });
        }
    }

    private void start(ServerWebSocket serverWebSocket, Map<String, Subscription> map, ApolloWSMessage apolloWSMessage) {
        Function<ApolloWSMessage, Object> function;
        Function<ApolloWSMessage, DataLoaderRegistry> function2;
        String string = apolloWSMessage.content().getString("id");
        if (map.containsKey(string)) {
            stop(serverWebSocket, map, string);
        }
        GraphQLQuery graphQLQuery = new GraphQLQuery(apolloWSMessage.content().getJsonObject("payload"));
        ExecutionInput.Builder newExecutionInput = ExecutionInput.newExecutionInput();
        newExecutionInput.query(graphQLQuery.getQuery());
        synchronized (this) {
            function = this.queryContextFactory;
        }
        newExecutionInput.context(function.apply(apolloWSMessage));
        synchronized (this) {
            function2 = this.dataLoaderRegistryFactory;
        }
        DataLoaderRegistry apply = function2.apply(apolloWSMessage);
        if (apply != null) {
            newExecutionInput.dataLoaderRegistry(apply);
        }
        String operationName = graphQLQuery.getOperationName();
        if (operationName != null) {
            newExecutionInput.operationName(operationName);
        }
        Map<String, Object> variables = graphQLQuery.getVariables();
        if (variables != null) {
            newExecutionInput.variables(variables);
        }
        this.graphQL.executeAsync(newExecutionInput).thenAccept(executionResult -> {
            if (executionResult.getData() instanceof Publisher) {
                subscribe(serverWebSocket, map, string, executionResult);
            } else {
                sendMessage(serverWebSocket, string, ApolloWSMessageType.DATA, new JsonObject(executionResult.toSpecification()));
            }
        });
    }

    private void subscribe(final ServerWebSocket serverWebSocket, final Map<String, Subscription> map, final String str, ExecutionResult executionResult) {
        Publisher publisher = (Publisher) executionResult.getData();
        final AtomicReference atomicReference = new AtomicReference();
        publisher.subscribe(new Subscriber<ExecutionResult>() { // from class: io.vertx.ext.web.handler.graphql.impl.ApolloWSHandlerImpl.1
            public void onSubscribe(Subscription subscription) {
                atomicReference.set(subscription);
                map.put(str, subscription);
                subscription.request(1L);
            }

            public void onNext(ExecutionResult executionResult2) {
                ApolloWSHandlerImpl.this.sendMessage(serverWebSocket, str, ApolloWSMessageType.DATA, new JsonObject(executionResult2.toSpecification()));
                ((Subscription) atomicReference.get()).request(1L);
            }

            public void onError(Throwable th) {
                ApolloWSHandlerImpl.this.sendMessage(serverWebSocket, str, ApolloWSMessageType.ERROR, new JsonObject().put("message", th.getMessage()));
                map.remove(str);
            }

            public void onComplete() {
                ApolloWSHandlerImpl.this.sendMessage(serverWebSocket, str, ApolloWSMessageType.COMPLETE, null);
                map.remove(str);
            }
        });
    }

    private void stop(ServerWebSocket serverWebSocket, Map<String, Subscription> map, String str) {
        Subscription subscription = map.get(str);
        if (subscription != null) {
            subscription.cancel();
            map.remove(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessage(ServerWebSocket serverWebSocket, String str, ApolloWSMessageType apolloWSMessageType, Object obj) {
        Objects.requireNonNull(apolloWSMessageType, "type is null");
        JsonObject jsonObject = new JsonObject();
        if (str != null) {
            jsonObject.put("id", str);
        }
        jsonObject.put("type", apolloWSMessageType.getText());
        if (obj != null) {
            jsonObject.put("payload", obj);
        }
        serverWebSocket.writeTextMessage(jsonObject.toString());
    }
}
