Vert.x gRPC Client
Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client.
This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel
Using Vert.x gRPC Client
To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-client</artifactId>
<version>5.0.0.CR2</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-client:5.0.0.CR2'
}
Creating a gRPC client
You can easily create the gRPC client
GrpcClient client = GrpcClient.client(vertx);
Client request/response API
The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.
Request/response
Interacting with a gRPC server involves creating a request to the remote gRPC service
SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
ServiceMethod<HelloReply, HelloRequest> sayHelloMethod = VertxGreeterGrpcClient.SayHello;
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
// The end method calls the service
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
request.response().onSuccess(response -> {
Future<HelloReply> fut = response.last();
fut.onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
});
Future composition can combine all the previous steps together in a compact fashion
client
.request(server, VertxGreeterGrpcClient.SayHello).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
Streaming request
Streaming requests involve calling write
for each element of the stream
and using end
to end the stream
client
.request(server, VertxStreamingGrpcClient.Sink)
.onSuccess(request -> {
for (int i = 0;i < 10;i++) {
request.write(Item.newBuilder().setValue("1").build());
}
request.end();
});
Streaming response
You can set handlers to process response events of a streaming response
client
.request(server, VertxStreamingGrpcClient.Source)
.compose(request -> {
request.end(Empty.getDefaultInstance());
return request.response();
})
.onSuccess(response -> {
response.handler(item -> {
// Process item
});
response.endHandler(v -> {
// Done
});
response.exceptionHandler(err -> {
// Something went bad
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response.
Flow control
Request and response are back pressured Vert.x streams.
You can check the writability of a request and set a drain handler
if (request.writeQueueFull()) {
request.drainHandler(v -> {
// Writable again
});
} else {
request.write(item);
}
You can pause/resume/fetch a response
response.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
response.resume();
});
Timeout and deadlines
The gRPC client handles timeout and deadlines, setting a timeout on a gRPC request instructs the client to send the timeout information to make the server aware that the client desires a response within a defined time.
In addition, the client shall be configured to schedule a deadline: when a timeout is set on a request, the client schedules locally a timer to cancel the request when the response has not been received in time.
GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions()
.setTimeout(10)
.setTimeoutUnit(TimeUnit.SECONDS)
.setScheduleDeadlineAutomatically(true));
The timeout can also be set on a per-request basis.
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, VertxGreeterGrpcClient.SayHello);
fut.onSuccess(request -> {
request
// Given this request, set a 10 seconds timeout that will be sent to the gRPC service
.timeout(10, TimeUnit.SECONDS);
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
Cancellation
You can call cancel
to cancel a request
request.cancel();
cancellation sends an HTTP/2 reset frame to the server |
Client side load balancing
The gRPC Client can be configured to perform client side load balancing.
DNS based load balancing
DNS based load balancing works with DNS queries resolving a single host to multiple IP addresses (usually A
records).
You can set a load balancer to enable DNS-based load balancing
GrpcClient client = GrpcClient
.builder(vertx)
.withLoadBalancer(LoadBalancer.ROUND_ROBIN)
.build();
client
.request(SocketAddress.inetSocketAddress(port, server), VertxGreeterGrpcClient.SayHello)
.compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
The usual load balancing strategies are available, you can refer to the Vert.x HTTP client side load balancing documentation to configure them.
Address based load balancing
Address based load balancing relies on the Vert.x address resolver to resolve a single address to multiple host/port socket addresses.
You can set an address resolver to enable load balancing, the Vert.x Service Resolver implements a few address resolver, e.g. like a Kubernetes resolver.
GrpcClient client = GrpcClient
.builder(vertx)
.withAddressResolver(KubeResolver.create())
.withLoadBalancer(LoadBalancer.ROUND_ROBIN)
.build();
Unlike DNS based load balancing, address based load balancing uses an abstract Address
instead of
a SocketAddress
. The address resolver implementation resolves address to a list of socket addresses.
The Vert.x Servicer Resolver defines a ServiceAddress
.
ServiceAddress address = ServiceAddress.of("GreeterService");
client
.request(address, VertxGreeterGrpcClient.SayHello)
.compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
You can refer to the Vert.x Service Resolver project documentation for more details.
JSON wire format
gRPC implicitly assumes the usage of the Protobuf wire format.
The Vert.x gRPC client supports the JSON wire format as well.
You can call a JSON service method with the application/grpc+json
content-type.
client
.request(server, VertxGreeterGrpcClient.SayHello_JSON).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
JSON encoding/decoding is achieved by com.google.protobuf:protobuf-java-util library.
|
Anemic JSON is also supported with Vert.x JsonObject
ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.client(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.JSON_OBJECT,
GrpcMessageDecoder.JSON_OBJECT
);
client
.request(server, sayHello).compose(request -> {
request.end(new JsonObject().put("name", "Bob"));
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getString("message"));
});
Compression
You can compress request messages by setting the request encoding prior before sending any message
request.encoding("gzip");
// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());
Decompression
Decompression is achieved transparently by the client when the server sends encoded responses.
Message level API
The client provides a message level API to interact directly with protobuf encoded gRPC messages.
the client message level API can be used with the server message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.end(protoHello);
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.handler(protoReply -> {
// Handle the protobuf reply
});
});
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the server encoding.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.endMessage(GrpcMessage.message("identity", protoHello));
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.messageHandler(replyMessage -> {
System.out.println("Got reply message encoded as " + replyMessage.encoding());
});
});
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Client stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service clients.
A client wraps a GrpcClient
and provides Vert.x idiomatic API to interact with the service
VertxGreeterGrpcClient client = new VertxGreeterGrpcClient(grpcClient, SocketAddress.inetSocketAddress(port, host));
You can also specify the JSON wire format when creating a stub
VertxGreeterGrpcClient client = new VertxGreeterGrpcClient(grpcClient, SocketAddress.inetSocketAddress(port, host), WireFormat.JSON);
The client will send application/grpc+json
requests.
Unary services
Unary services returns a Vert.x Future
Future<HelloReply> response = client.sayHello(HelloRequest.newBuilder().setName("John").build());
response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));
response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));
Streaming requests
Streaming requests use a lambda passed a Vert.x WriteStream
of messages sent to the service
Future<Empty> response = client.sink(stream -> {
stream.write(Item.newBuilder().setValue("Value 1").build());
stream.write(Item.newBuilder().setValue("Value 2").build());
stream.end(Item.newBuilder().setValue("Value 3").build());
});
Streaming responses
Streaming responses get a Vert.x ReadStream
of messages sent by the service
Future<GrpcReadStream<Item>> response = client.source(Empty.getDefaultInstance());
response.onSuccess(stream -> stream
.handler(item -> System.out.println("Item " + item.getValue()))
.exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
.endHandler(v -> System.out.println("Stream ended")));
response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));