Vert.x gRPC Server

Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.

This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.

Using Vert.x gRPC Server

To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-server</artifactId>
  <version>5.0.0.CR1</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-server:5.0.0.CR1'
}

Creating a gRPC server

A GrpcServer is a Handler<HttpServerRequest> and can be used as an HTTP server request handler.

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();

A GrpcServer can be used within a Vert.x Web router:

router.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));

gRPC-Web protocol

The Vert.x gRPC Server supports the gRPC-Web protocol by default.

To disable the gRPC-Web protocol support, configure options with GrpcServerOptions#setGrpcWebEnabled(false) and then create a server with GrpcServer#server(vertx, options).

If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. This can be done with a Vert.x Web router and the CORS handler:

CorsHandler corsHandler = CorsHandler.create()
  .addRelativeOrigin("https://www.mycompany.com")
  .allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout"))
  .exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message"));
router.route("/com.mycompany.MyService/*").handler(corsHandler);

Server request/response API

The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.

Request/response

Each service method is processed by a handler, the handler is bound using a ServiceMethod.

ServiceMethod<HelloRequest, HelloReply> serviceMethod = VertxGreeterGrpcServer.SayHello;

server.callHandler(serviceMethod, request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

ServiceMethod constants are generated by the Vert.x gRPC protoc plugin.

Streaming request

You can set handlers to process request events

server.callHandler(VertxStreamingGrpcServer.Sink, request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

Streaming response

A streaming response involves calling write for each element of the stream and using end to end the stream

server.callHandler(VertxStreamingGrpcServer.Source, request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response

server.callHandler(VertxStreamingGrpcServer.Pipe, request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});
The gRPC-Web protocol does not support bidirectional streaming.

Flow control

Request and response are back pressured Vert.x streams.

You can pause/resume/fetch a request

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

You can check the writability of a response and set a drain handler

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

Timeout and deadlines

The gRPC server handles timeout and deadlines.

Whenever the service receives a request indicating a timeout, the timeout can be retrieved.

long timeout = request.timeout();

if (timeout > 0L) {
  // A timeout has been received
}

By default, the server

  • does not schedule automatically a deadline for a given request

  • does not automatically propagate the deadline to a vertx client

The server can schedule deadlines: when a request carries a timeout, the server schedules locally a timer to cancel the request when the response has not been sent in time.

The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute a timeout to be sent and cascade the timeout to another gRPC server.

GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .setScheduleDeadlineAutomatically(true)
  .setDeadlinePropagation(true)
);

JSON wire format

gRPC implicitly assumes the usage of the Protobuf wire format.

The Vert.x gRPC server supports the JSON wire format as well.

You can use a JSON service method to bind a service method accepting requests carrying the application/grpc+json content-type.

server.callHandler(VertxGreeterGrpcServer.SayHello_JSON, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(HelloReply.newBuilder()
      .setMessage("Hello " + helloRequest.getName()).build()
    );
  });
});

The com.google.protobuf:protobuf-java-util library performs the JSON encoding/decoding.

the same service method can be bound twice with Protobuf and JSON wire formats.

Anemic JSON is also supported with Vert.x JsonObject

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);

server.callHandler(sayHello, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
  });
});

Compression

You can compress response messages by setting the response encoding prior before sending any message

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol.

Decompression

Decompression is done transparently by the server when the client send encoded requests.

Decompression is not supported over the gRPC-Web protocol.

Message level API

The server provides a message level API to interact directly with protobuf encoded gRPC messages.

the server message level API can be used with the client message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

Server stub API

In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service stubs.

Each service comes in two flavors, you can override the method you like depending on the style.

Unary services

Unary services can return a Vert.x Future

VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};
stub.bindAll(server);

or process a Vert.x Promise

VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
  @Override
  public void sayHello(HelloRequest request, Promise<HelloReply> response) {
    response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};
stub.bindAll(server);

In both case you need to bind the stub to an existing GrpcServer

stub.bindAll(server);

You can also specify the JSON wire format when binding a stub.

stub.bindAll(server, WireFormat.JSON);

The server will accept the application/grpc+json requests.

Streaming requests

Streaming requests are implemented with a ReadStream:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
  @Override
  public void sink(GrpcReadStream<Item> stream, Promise<Empty> response) {
    stream.handler(item -> {
      System.out.println("Process item " + item.getValue());
    });
    // Send response
    stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
  }
};
stub.bindAll(server);

Streaming responses

Streaming responses are implemented with Vert.x streams and comes in two flavors.

You can return a Vert.x ReadStream and let the service send it for you:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
  @Override
  public ReadStream<Item> source(Empty request) {
    return streamOfItems();
  }
};

or you can process a WriteStream:

VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
  @Override
  public void source(Empty request, GrpcWriteStream<Item> response) {
    response.write(Item.newBuilder().setValue("value-1").build());
    response.end(Item.newBuilder().setValue("value-2").build());
  }
};