Vert.x gRPC Server
Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.
This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.
Using Vert.x gRPC Server
To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>5.0.0.CR1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-server:5.0.0.CR1'
}
Creating a gRPC server
A GrpcServer
is a Handler<HttpServerRequest>
and can be used as an HTTP server request handler.
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
A
|
gRPC-Web protocol
The Vert.x gRPC Server supports the gRPC-Web protocol by default.
To disable the gRPC-Web protocol support, configure options with GrpcServerOptions#setGrpcWebEnabled(false)
and then create a server with GrpcServer#server(vertx, options)
.
If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. This can be done with a Vert.x Web router and the CORS handler:
|
Server request/response API
The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.
Request/response
Each service method is processed by a handler, the handler is bound using a ServiceMethod
.
ServiceMethod<HelloRequest, HelloReply> serviceMethod = VertxGreeterGrpcServer.SayHello;
server.callHandler(serviceMethod, request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
Streaming request
You can set handlers to process request events
server.callHandler(VertxStreamingGrpcServer.Sink, request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
Streaming response
A streaming response involves calling write
for each element of the stream
and using end
to end the stream
server.callHandler(VertxStreamingGrpcServer.Source, request -> {
GrpcServerResponse<Empty, Item> response = request.response();
request.handler(empty -> {
for (int i = 0;i < 10;i++) {
response.write(Item.newBuilder().setValue("1").build());
}
response.end();
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response
server.callHandler(VertxStreamingGrpcServer.Pipe, request -> {
request.handler(item -> request.response().write(item));
request.endHandler(v -> request.response().end());
});
The gRPC-Web protocol does not support bidirectional streaming. |
Flow control
Request and response are back pressured Vert.x streams.
You can pause/resume/fetch a request
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
You can check the writability of a response and set a drain handler
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
Timeout and deadlines
The gRPC server handles timeout and deadlines.
Whenever the service receives a request indicating a timeout, the timeout can be retrieved.
long timeout = request.timeout();
if (timeout > 0L) {
// A timeout has been received
}
By default, the server
-
does not schedule automatically a deadline for a given request
-
does not automatically propagate the deadline to a vertx client
The server can schedule deadlines: when a request carries a timeout, the server schedules locally a timer to cancel the request when the response has not been sent in time.
The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute a timeout to be sent and cascade the timeout to another gRPC server.
GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
.setScheduleDeadlineAutomatically(true)
.setDeadlinePropagation(true)
);
JSON wire format
gRPC implicitly assumes the usage of the Protobuf wire format.
The Vert.x gRPC server supports the JSON wire format as well.
You can use a JSON service method to bind a service method accepting requests carrying the application/grpc+json
content-type.
server.callHandler(VertxGreeterGrpcServer.SayHello_JSON, request -> {
request.last().onSuccess(helloRequest -> {
request.response().end(HelloReply.newBuilder()
.setMessage("Hello " + helloRequest.getName()).build()
);
});
});
The com.google.protobuf:protobuf-java-util
library performs the JSON encoding/decoding.
the same service method can be bound twice with Protobuf and JSON wire formats. |
Anemic JSON is also supported with Vert.x JsonObject
ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.JSON_OBJECT,
GrpcMessageDecoder.JSON_OBJECT
);
server.callHandler(sayHello, request -> {
request.last().onSuccess(helloRequest -> {
request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
});
});
Compression
You can compress response messages by setting the response encoding prior before sending any message
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol. |
Decompression
Decompression is done transparently by the server when the client send encoded requests.
Decompression is not supported over the gRPC-Web protocol. |
Message level API
The server provides a message level API to interact directly with protobuf encoded gRPC messages.
the server message level API can be used with the client message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the
client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case
the message does not need to be decompressed and compressed again.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Server stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service stubs.
Each service comes in two flavors, you can override the method you like depending on the style.
Unary services
Unary services can return a Vert.x Future
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
stub.bindAll(server);
or process a Vert.x Promise
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public void sayHello(HelloRequest request, Promise<HelloReply> response) {
response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
stub.bindAll(server);
In both case you need to bind the stub to an existing GrpcServer
stub.bindAll(server);
You can also specify the JSON wire format when binding a stub.
stub.bindAll(server, WireFormat.JSON);
The server will accept the application/grpc+json
requests.
Streaming requests
Streaming requests are implemented with a ReadStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void sink(GrpcReadStream<Item> stream, Promise<Empty> response) {
stream.handler(item -> {
System.out.println("Process item " + item.getValue());
});
// Send response
stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
}
};
stub.bindAll(server);
Streaming responses
Streaming responses are implemented with Vert.x streams and comes in two flavors.
You can return a Vert.x ReadStream
and let the service send it for you:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public ReadStream<Item> source(Empty request) {
return streamOfItems();
}
};
or you can process a WriteStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void source(Empty request, GrpcWriteStream<Item> response) {
response.write(Item.newBuilder().setValue("value-1").build());
response.end(Item.newBuilder().setValue("value-2").build());
}
};