This component facilitates AMQP integrations for Vert.x by providing a thin wrapper around the Apache Qpid Proton-J AMQP 1.0 protocol engine.
Note
|
This module exposes certain Proton-J classes directly, as it is an integration layer to integrate the Proton-J engine with the Vert.x threading model and networking layers, rather than a high level client or server abstraction. |
Using Vert.x Proton
To use Vert.x Proton, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${maven.version}</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile io.vertx:vertx-proton:${maven.version}
Creating a connection
Here is an example of connecting and then opening a connection, which can then be used to create senders and receivers.
ProtonClient client = ProtonClient.create(vertx);
// Connect, then use the event loop thread to process things thereafter
client.connect("hostname", 5672, "username", "password", connectResult -> {
if (connectResult.succeeded()) {
connectResult.result().setContainer("my-container/client-id").openHandler(openResult -> {
if (openResult.succeeded()) {
ProtonConnection conn = openResult.result();
// Create senders, receivers etc..
}
}).open();
}
});
Creating a sender
Here is an example of creating a sender and sending a message with it. The onUpdated handler provided in the send call is invoked when disposition updates are received for the delivery, with the example using this to print the delivery state and whether the delivery was settled.
connection.createSender("myQueue").openHandler(openResult -> {
if (openResult.succeeded()) {
ProtonSender sender = openResult.result();
Message message = message();
message.setBody(new AmqpValue("Hello World"));
// Send message, providing an onUpdated delivery handler that prints updates
sender.send(message, delivery -> {
System.out.println(String.format("Message received by server: remote state=%s, remotely settled=%s",
delivery.getRemoteState(), delivery.remotelySettled()));
});
}
}).open();
Creating a receiver
Here is an example of creating a receiver, and setting a message handler to process the incoming messages and their related delivery.
connection.createReceiver("myQueue").handler((delivery, msg) -> {
Section body = msg.getBody();
if (body instanceof AmqpValue) {
System.out.println("Received message with content: " + ((AmqpValue) body).getValue());
}
// By default, the receiver automatically accepts (and settles) the delivery
// when the handler returns if no other disposition has already been applied.
}).open();
Threading Considerations
The Proton protocol engine is inherently single threaded, so a given engine and any related connection etc objects must only be used by a single thread at a time. To satisfy this, vertx-proton requires that a connection and related objects are only used by a single Vert.x Context, the one associated with the underlying socket during creation of the connection, with result that only a single thread uses the protocol engine.
In the case of a ProtonClient connection, this is always the Context used (or created automatically if there wasn’t one present) while calling the connect() methods. The connect handler, and any subsequent callbacks for the related connection, will be fired using this context. The application must use the same context to interact with the connection and related objects outwith any callbacks.
For example, consider the following code:
client.connect("hostname", 5672, connectResult -> {
// In this case the context will be either the one used to call connect
// or one created during the process if there was none originally.
Context connectionCtx = Vertx.currentContext();
});
If the above were to be run within a Verticle initialisation for example, then the verticle Context would be used by the connection for I/O and any callbacks, as would calls by the verticle to interact with the connection outwith such callbacks.
If however the above snippet were run outwith a Verticle, e.g. embedded in a simple main() method, then no Context would be present during the connect() call and a new one would have to be automatically created during the call. This Context would then be used for the connect callback and any subsequent callbacks. The application code would need to ensure it also used this context to run any interactions with the connection outside of callbacks. One option would be to capture the context in a callback as shown above and then later use it to run tasks. Another would be to explicitly create a Context if needed before calling connect, and then run the connect() operation on it, thus ensuring it becomes the Context associated with the connection. This can be seen in the example below:
Context myContext = vertx.getOrCreateContext();
myContext.runOnContext(x -> {
client.connect("hostname", 5672, connectResult -> {
// In this case the context will be 'myContext' from earlier
});
});
In the case of a ProtonServer connection, a Context is associated with the underlying socket for the connection when the server accepts it. This will be used in any callbacks such as those when the connection initially opens and later when sessions etc are opened on it and used. Any usage of a given connection outside of its own callbacks, including any cross-connection interactions, should again be ensured to run on the connections own Context as outlined above.