Apache Camel (http://camel.apache.org) is an open source Java framework that focuses on making integration easier and more accessible to developers. This bridge lets Vert.x applications interact with Camel endpoints:
-
the application can send messages to Camel.
-
the application can receive message from Camel.
The bridge relies on the Vert.x event bus and associate an event bus address to a Camel endpoint.
Caution
|
This component is not polyglot as it requires some classes from Camel that can only be used in Java. |
Using vertx-camel-bridge
To use the Vert.x Camel Bridge, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-camel-bridge</artifactId>
<version>3.6.0.CR1</version>
</dependency>
-
Gradle (in your
build.gradle
file):
compile 'io.vertx:vertx-camel-bridge:3.6.0.CR1'
Bridge configuration
Before being used, the bridge needs to be configured and started:
CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();
The bridge requires a CamelContext
. It will find the endpoint from the context. The bridge needs to be started
before being used. Be aware the the start
method is asynchronous. You can use
`start`to be notified when the bridge has been started.
Inbound mapping
Inbound mapping associates a Camel endpoint to an event bus address. Messages received on this endpoint are transformed to event bus messages.
Endpoint endpoint = camel.getEndpoint("direct:foo");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withoutHeadersCopy())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.usePublish())
.addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
.withBodyType(String.class))
);
The snippet above shows different ways to configure an inbound mapping:
-
you can configure the Camel endpoint either using the
Endpoint
object or its uri -
you can disables the header copy (Camel message headers are copied to the event bus message)
-
you can uses
publish
instead ofsend
to broadcast the message to all event bus consumers -
you can configures the type of the event bus message body. If not set it uses the Camel message payload. If sets, it looks in the Camel context for a converter between the Camel message payload and the desired type.
Note: org.fusesource.hawtbuf.Buffer
are automatically converted to Buffer
.
If send
is used (so not publish
), and when the Camel exchange expect a reply (In Out exchange), the Vert.x
code expect as reply to the sent message. When the reply arrives it is propagated to the exchange:
Endpoint endpoint = camel.getEndpoint("direct:stuff");
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));
vertx.eventBus().consumer("with-reply", message -> {
message.reply("How are you ?");
});
camel.start();
bridge.start();
ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);
You can also configure the reply {@code timeout} using setTimeout
.
Outbound mapping
Outbound mapping associates an event bus address to a Camel endpoint. Messages received on this event bus address are transformed to Camel messages and sent to the endpoint.
Endpoint endpoint = camel.getEndpoint("stream:out");
CamelBridge.create(vertx,
new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
.withoutHeadersCopy())
.addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);
The snippet above shows different ways to configure an outbound mapping.
You can connect your outbound mapping to a Camel route:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.transform(constant("OK"));
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));
camel.start();
bridge.start();
vertx.eventBus().send("test", "hello", reply -> {
// Reply from the route (here it's "OK")
});
If when you send the message on the event bus you register a reply handler, it configures the Camel exchange to expect a response (it uses the request-reply pattern of the EIP). The response is passed in the reply body. If the route fails, you get a reply failure (recipient failure), with the message as cause:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
camel.start();
bridge.start();
vertx.eventBus().send("camel-route", "hello", reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
If the processing you apply is blocking, you*must** set blocking to true
. This avoid executing the
processing on the event loop thread:
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:my-route")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Do something blocking...
}
})
.to("http://localhost:8080");
}
});
CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
.addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));
camel.start();
bridge.start();
vertx.eventBus().send("camel-route", "hello", reply -> {
if (reply.succeeded()) {
Object theResponse = reply.result().body();
} else {
Throwable theCause = reply.cause();
}
});
By default it uses the default worker thread pool, this is customizable using the `setWorkerExecutor`method.
Stopping the bridge
Don’t forget to stop the bridge using the stop
method. The stop
method is asynchronous. You can use
`stop`to be notified when the bridge has been stopped.
Exchanging custom object
If you want to send and receive custom objects, you need to register a codec on the event bus:
vertx.eventBus().registerDefaultCodec(Person.class, codec);