Apache Camel (http://camel.apache.org) is an open source Java framework that focuses on making integration easier and more accessible to developers. This bridge lets Vert.x applications interact with Camel endpoints:

  • the application can send messages to Camel.

  • the application can receive message from Camel.

The bridge relies on the Vert.x event bus and associate an event bus address to a Camel endpoint.

Caution
This component is not polyglot as it requires some classes from Camel that can only be used in Java.

Using vertx-camel-bridge

To use the Vert.x Camel Bridge, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-camel-bridge</artifactId>
 <version>3.6.0.CR1</version>
</dependency>
  • Gradle (in your build.gradle file):

compile 'io.vertx:vertx-camel-bridge:3.6.0.CR1'

Bridge configuration

Before being used, the bridge needs to be configured and started:

CamelContext camel = new DefaultCamelContext();
CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
).start();

The bridge requires a CamelContext. It will find the endpoint from the context. The bridge needs to be started before being used. Be aware the the start method is asynchronous. You can use `start`to be notified when the bridge has been started.

Inbound mapping

Inbound mapping associates a Camel endpoint to an event bus address. Messages received on this endpoint are transformed to event bus messages.

Endpoint endpoint = camel.getEndpoint("direct:foo");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addInboundMapping(InboundMapping.fromCamel("direct:stuff").toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address"))
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withoutHeadersCopy())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .usePublish())
        .addInboundMapping(InboundMapping.fromCamel(endpoint).toVertx("eventbus-address")
            .withBodyType(String.class))
);

The snippet above shows different ways to configure an inbound mapping:

  • you can configure the Camel endpoint either using the Endpoint object or its uri

  • you can disables the header copy (Camel message headers are copied to the event bus message)

  • you can uses publish instead of send to broadcast the message to all event bus consumers

  • you can configures the type of the event bus message body. If not set it uses the Camel message payload. If sets, it looks in the Camel context for a converter between the Camel message payload and the desired type.

Note: org.fusesource.hawtbuf.Buffer are automatically converted to Buffer.

If send is used (so not publish), and when the Camel exchange expect a reply (In Out exchange), the Vert.x code expect as reply to the sent message. When the reply arrives it is propagated to the exchange:

Endpoint endpoint = camel.getEndpoint("direct:stuff");

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addInboundMapping(new InboundMapping().setAddress("test-reply").setEndpoint(endpoint)));

vertx.eventBus().consumer("with-reply", message -> {
  message.reply("How are you ?");
});

camel.start();
bridge.start();

ProducerTemplate template = camel.createProducerTemplate();
Future<Object> future = template.asyncRequestBody(endpoint, "hello");
String response = template.extractFutureBody(future, String.class);

You can also configure the reply {@code timeout} using setTimeout.

Outbound mapping

Outbound mapping associates an event bus address to a Camel endpoint. Messages received on this event bus address are transformed to Camel messages and sent to the endpoint.

Endpoint endpoint = camel.getEndpoint("stream:out");

CamelBridge.create(vertx,
    new CamelBridgeOptions(camel)
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel("stream:out"))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint)
            .withoutHeadersCopy())
        .addOutboundMapping(OutboundMapping.fromVertx("eventbus-address").toCamel(endpoint))
);

The snippet above shows different ways to configure an outbound mapping.

You can connect your outbound mapping to a Camel route:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:start")
        .transform(constant("OK"));
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));

camel.start();
bridge.start();


vertx.eventBus().send("test", "hello", reply -> {
  // Reply from the route (here it's "OK")
});

If when you send the message on the event bus you register a reply handler, it configures the Camel exchange to expect a response (it uses the request-reply pattern of the EIP). The response is passed in the reply body. If the route fails, you get a reply failure (recipient failure), with the message as cause:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
        .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
    .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));

camel.start();
bridge.start();

vertx.eventBus().send("camel-route", "hello", reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

If the processing you apply is blocking, you*must** set blocking to true. This avoid executing the processing on the event loop thread:

camel.addRoutes(new RouteBuilder() {
  @Override
  public void configure() throws Exception {
    from("direct:my-route")
      .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
          // Do something blocking...
        }
      })
      .to("http://localhost:8080");
  }
});

CamelBridge bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel)
  .addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));

camel.start();
bridge.start();

vertx.eventBus().send("camel-route", "hello", reply -> {
  if (reply.succeeded()) {
    Object theResponse = reply.result().body();
  } else {
    Throwable theCause = reply.cause();
  }
});

By default it uses the default worker thread pool, this is customizable using the `setWorkerExecutor`method.

Stopping the bridge

Don’t forget to stop the bridge using the stop method. The stop method is asynchronous. You can use `stop`to be notified when the bridge has been stopped.

Exchanging custom object

If you want to send and receive custom objects, you need to register a codec on the event bus:

vertx.eventBus().registerDefaultCodec(Person.class, codec);