Enterprise Integration with Apache Camel

Context

Any application in an enterprise context, regardless how small this context might be, must relate with other systems. These systems might be File system, databases, webservices, message queues, logging systems, or systems using a particular communication protocol. Moreover, data typically undergo transformations, switching and routing logics before reaching other systems. The variety of combinations this allows is enormous, and tackling each of these in a hand made, custom way might easily become an integration nightmare. Enterprise Integration Patterns (EIP) establish a standard way to describe and identify the different approaches that one can follow to deal with an integration problem (see http://www.enterpriseintegrationpatterns.com). They establish a common vocabulary that can be used unambiguously when talking about integration. If we consider that integration solutions are ubuquitous in application development, we realize easily how convenient it might be to have solid foundations on this subject.

Apache Camel is a framework that implements EIPs through a very expressive DSL, so one can translate almost immediately any EIP to a corresponding expression in the DSL. Moreover, Camel provides an extensible set of components that allows you to deal with basically any system that might come at hand. A key feature of Camel is that it deals with a normalized message format, so after the consumption point the message has a standard format, e.g. it can be handled identically either if it comes from consuming from a JMS queue or from a SOAP or REST webservice.

It is easier to grasp the concepts setting up a simple Camel project and seeing these features at work.

The easiest way to use Camel is to include it in your Spring context. We will use Camel 2.15.1 that provides a way to setup a Camel-based application without the need of xml configuration.

Maven Dependencies

First of all let’s include the dependencies in our pom.xml:

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <!-- camel -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring-javaconfig</artifactId>
            <version>${camel.version}</version>
        </dependency>
    </dependencies>

camel-core provides the essence of the camel infrastructure and basic components, camel-spring and camel-spring-javaconfig provide the classes and annotations that allow us to configure and run Camel within a Java-configured Spring application.

Embed Camel in a bootable application

The org.apache.camel.spring.javaconfig.CamelConfiguration abstract class can be used as a base Spring configuration class, where we can reference all the beans we might need in the standard Spring way (xml or annotations based). The additional thing that this class does is loading a CamelContext and injecting any bean that extends RouteBuilder available in the Spring context, in the CamelContext. More about the CamelContext will follow along the article. To give ourselves some more flexibility we will also use a standard xml Spring configuration file.

The simplest configuration of such an application boils down to:

/**
 * We extend CamelConfiguration so we can build a Camel Context using purely annotated classes
 *
 */
@Configuration
public class Boot extends CamelConfiguration {

    public static void main( String[] args ) throws Exception {
        Main main = new Main();
        //use any route builder and components declared within this package
        main.setFileApplicationContextUri("classpath:/spring/spring.xml");
        main.setBasedPackages("nl.sytac.edu");
        main.run();
    }

    @Bean
    public RouteBuilder getSimpleRouteBuilder() {
        return new SimpleRouteBuilder();
    }
}

Integrating Camel in an existing (web) application

If you want to integrate Camel in an existing application, injecting a CamelContext in an existing Spring application context is pretty straightforward, see e.g. https://github.com/pierangeloc/webshop-camel-springmvc for an example of how to integrate Camel within an existing web application.

Simple Example

Now that we outlined the base structure of a Camel-based application, let’s try to put this at work and build a simple application. The purpose is just to show how we can have a working application with minimum effort. Let us suppose we have a simple ERP application that provides us periodically with an updated stock situation about our e-shop, and we want to provide this information to a third application, e.g. an analytics application. We want to expose a simple REST service, with one URL and supporting only POST method. The aim is to persist the body of our POST call in a file, in a configured location. A file will be created for every request.

Setup Rest Endpoint

The most convenient way to expose or consume a REST or SOAP webservices in Camel is to use the CXF component. With CXF we can use JAX-RS and JAX-WS annotations to configure the service classes.

The first thing we must do is to setup the serving class that describes the resource we want to expose:

package io.sytac.edu.rest;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;

/**
 * Service class for our REST endpoint
 */

@Path("/inventory/status")
public class InventoryResource {

    @POST
    @Consumes("application/json")
    @Produces("application/json")
    public Response updateStatus(String status) {
        return null;
    }
}

We provided a trivial implementation of the method, as the real implementation of the logic will be delegated to the route we are about to create. The JAX-RS annotations allow us to specify in a transparent way the supported content types, having the implementation actually enforcing them. To make the endpoint available to our route, the most flexible way is to create a CXF bean that delegates to it, and to set it in our Spring context:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cxf="http://camel.apache.org/schema/cxf"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
              http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
              http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

        <cxf:rsServer id="restInventory" address="http://localhost:8000/services"
                serviceClass="io.sytac.edu.rest.InventoryResource" loggingFeatureEnabled="true"/>

</beans>

Here we created a cxf:rsServer bean as we want it to serve requests, and specified the hostname, port, base url. The serviceClass attribute points to the class we defined above. Again, the implementation of this class is just ignored as the real processing of the request is specified in the route, which we are about to explore.

Routing

Routes are specified in Camel by extending the abstract class RouteBuilder, which provides all the Camel DSL goodness. All we have to do is implement the configure() method and specify the route(s). A route has one consumer endpoint, representing the point the processing starts from, and one or more producer endpoints, to represent delegation steps outside the present route, typically using Camel components. In our case we want to consume requests coming to our REST resource, therefore we will specify this circumstance in the from. The cxfrs component is used to expose or consume rest services through CXF. Being used in a from clause means it is acting as a REST server. The transport layer is provided in our case by a Jetty server that we embed in our application, but a standard Servlet transport could be used as well when running within a container.

After the from(...) we specify the EIP that must be applied in our processing logic, and the DSL provides a natural and straightforward way to do so. A simple, dummy implementation of this concept applied to our use case results in this initial version of our route builder, that defines one route that accepts any JSON request and replies with a static JSON response. This allows us to inspect how a Processor works in Camel. All the processing steps in Camel are essentially done through a chain of Processor implementations, each acting on a mutable instance of Exchange (we will get back later to the details of the Exchange). The DSL implicitly introduces processors in a transparent way, however in an explicit Processor we can access directly the structure of the Exchange. To some extent we can do this also through the DSL, and in general using the DSL gives us a more consistent view of the processing, without the need to inspect the details of a specific Processor implementation.


@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .process(
                new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody(
                                Response.ok().entity(
                                    "{\n" +
                                    "  \"status\": \"OK\",\n" +
                                    "  \"message\": \"stock update received\"\n" +
                                    "}").build());
                    }
                }
            );
    }
}

If you run the code at this check-point you will see that the server is running and it accepts only JSON POST, replying always with the same message.

What happened exactly along the route?

The cxf component accepted the request, and mapped the body of the request in the body of the In message of the Exchange, and the request headers in equivalent message headers. At this point any processing step deals with the normalized message structure being the Exchange, and can access it and modify being completely agnostic of where it comes from (it could come equivalently from consuming a JMS queue) or where it will go to (file system or DB).

The equivalent version that uses purely DSL constructs looks like this:

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .setBody(constant("{\n" +
                                "  \"status\": \"OK\",\n" +
                                "  \"message\": \"stock update received\"\n" +
                                "}"))
        ;
    }
}

or even better, using Jackson as a provider for CXF i.e. having the whole serialization being handled behind the scenes:

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

This requires only a slight complication on the configuration of the rsServer, to set the provider to a Jackson bean. In any case this pays off in terms of conciseness and clarity.

Code check-point

Persisting the message to a file

After this diversion on the routing logic, let’s focus again on our use case. We want to persist each JSON message we receive through the request body into a file, and we want a new file for each request to be written. The camel File component provides a flexible and straightforward way to do so. We must simply introduce a step that sends the Exchange to the file component, that takes care of all the boilerplate and tricky parts we have to deal with when writing on file system in a concurrent environment.

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

In general components in Camel are addressed by means of a url where the protocol represents the component we want to use, followed by a base url and a query string to configure and tweak the component itself. In this case we instruct file on how to create the file name, by means of the simple expression language which is very useful in Camel when we have to deal with dynamic content. In this case we want the file name to contain a simple timestamp. If we run this example, we can see that for every request there is a new file created and saved with the name we defined. We didn’t have to bother about serializing/deserializing anything, as Camel did this for us. It is even possible to configure the components to deal properly with streaming situations, e.g. when we have to upload resources.

Code check-point

Persisting the message to a file and to MongoDB

Let us suppose we want to store the incoming JSON on a MongoDB collection instead of file system. Of course Camel provides us with a component for this case, just it requires us to add an extra dependency:

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-mongodb</artifactId>
        <version>${camel.version}</version>
    </dependency>

Then we must just configure a MongoClient bean in our Spring context:

    @Bean(name = "mongoClient")
    public Mongo mongoDb() throws UnknownHostException {
        //default port on localhost
        return new MongoClient("localhost", 27017);
    }

and then the ceremony is over. All we are left to do is replacing our file component with a mongodb component, configured to use the mongoClient bean above defined, and to use the inventory database, with updates as collection, and save operation (as we are saving our data in the collection).

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("mongodb:mongoClient?database=inventory&collection=updates&operation=save")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

If we run this code we can see that for every POST to our REST api we have an insertion on our database. Code check-point

We can even decide to store the message both on file system and on Mongo, and the route looks like this:

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json")
            .to("mongodb:mongoClient?database=inventory&collection=updates&operation=save")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

Code check-point

Adding routing logic

In the previous example we have stored the same message both on DB and filesystem. We might think to split the incoming message based on its structure, and store e.g. some jeans related stock information on DB and the shoes related information on filesystem.

For this kind of purposes an easy approach is to introduce a when clause where we can specify jsonpath expressions that our input message must match in order to be treated the way we need. For this exercise we will deal with shoes inventory updates that will be persisted on file:

{
    "shoes": [{
                "name": "Nike AIR",
                "description": "Running shoes",
                "DC1": 13,
                "DC2": 23
            },
            {
                "name": "Adidas Ultra Boost",
                "description": "Running shoes",
                "DC1": 7,
                "DC2": 3
            },
            {
                "name": "TOD's Suede",
                "description": "Sneakers",
                "DC1": 3,
                "DC2": 9
            }
    ]
}

and jeans inventory messages that will be persisted on DB:

{
    "jeans": [{
                "name": "Levi's 501",
                "description": "Blue jeans",
                "DC1": 32,
                "DC2": 10
            },
            {
                "name": "Diesel",
                "description": "Blue jeans",
                "DC1": 11,
                "DC2": 7
            }
    ]
}

We want also to deal with messages that do not conform to any of these 2 options, saving them in corresponding error files (we could also send them alternatively to a dead letter queue) .

The solution is very straightforward, it just might take a bit if you are new to jsonpath expressions:

@Component
public class SimpleRouteBuilder extends RouteBuilder {

    public static final String FILE_INVENTORY_ENDPOINT = "file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json";
    public static final String MONGO_INVENTORY_ENDPOINT = "mongodb:mongoClient?database=inventory&collection=updates&operation=save";
    public static final String FILE_ERROR_ENDPOINT = "file:///tmp/inventory?fileName=error-${date:now:yyyyMMdd@HHmmssSS}.json";

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
                .choice()
                    .when().jsonpath("$[?(@.shoes)]")
                        .to(FILE_INVENTORY_ENDPOINT)
                    .when().jsonpath("$[?(@.jeans)]")
                        .to(MONGO_INVENTORY_ENDPOINT)
                    .otherwise()
                        .to(FILE_ERROR_ENDPOINT)
                .end()
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

The choice clause allows us to specify any predicate or expression we might want to evaluate on the incoming Exchange, in this case as we know we are dealing with JSON body so we make direct use of the jsonpath construct. Camel Expression language is very rich and it might be subject of a future post.

Code check-point

At this point I think the concept about Camel routing is pretty clear, and you can appreciate how easy it is to make use of the components and of the routing logic. Just looking at the DSL we can understand and reason about the processing steps our data go through. In a future post we will explore further some of the DSL routing features, e.g. how to handle errors or perform testing.

I will close this article with an overview of the structural foundations of Camel and recurring concepts that will appear any time you have to deal with it.

Camel foundations

Camel is built around some key concepts that we will try to explain in this section: CamelContext, RouteBuilder, Endpoint Exchange, Message, ProducerTemplate

CamelContext

The foundation on top of which all the Camel machinery runs is the CamelContext, which is in control of the lifecycle of the routes and messages running on it. A CamelContext can be started/suspended/resumed/stopped, and it provides a number of configuration tweaks, e.g. to handle the underlying ExecutorService. A CamelContext can manage many routes, which represent processing paths for messages.

RouteBuilder

A Route is a description of the processing steps a message can encounter. A route starts from a consumer endpoint that consumes messages and processes them, and can end to a producer endpoint if the messages must result in an outcome of some type (e.g. a call to a webservice, or a message on a JMS queue). The standard way to create a Route in Camel is to inject a RouteBuilder in the CamelContext and override its define() method. The RouteBuilder class provides the Camel DSL which makes the implementation of EIP straightforward. We will see later a few examples of routes.

Endpoint

A Route could not process messages without a source of these messages, which is a consumer endpoint, and without a destination to which messages can be delivered after being processed, a producer endpoint. The concept of endpoint is itself an EIP http://www.enterpriseintegrationpatterns.com/MessageEndpoint.html and it is a cornerstone of the Camel solution to the integration problem: regardless of the nature of the producer or consumer endpoint, the intermediate steps deal with a message that have a standard structure and is in general agnostic about the nature of the source or destination of the message. The processing steps along the route deal with the same type of message, whether it came to the route through a webservice exposed by the route, or because it has been delivered as a file on a directory the route is listening to. The webservice, or the file listening service in this case, are different implementations of a consuming endpoint.

Exchange

Endpoints can produce or consume Exchanges. An Exchange is structured in an IN and an OUT message. When dealing with a consumer endpoint, we can see it as a source of Exchanges with IN message. The processing steps along the route can modify the IN message and ultimately populate the OUT message that is used by the consumer endpoint as a response (imagine the IN/OUT pattern as a mapping of the Request/Response pattern)

Message

The IN/OUT Messages in the exchange have a standard structure, being constituted of header, body and attachments. The processing steps can read or modify freely each of these parts, being all these mutable structures.

ProducerTemplate

In testing scenarios, or in situations where messages are not consumed from a Camel endpoint, you might want to create an Exchange and send it to an endpoint. This can happen e.g. when you are integrating Camel in an existing web application, and you want to use it to handle only one part of the request/response cycle, delegating the final handling to the existing framework (e.g. to a Spring Controller). Moreover, you want to be completely sure of not ending up in thread safety problems. For these cases the CamelContext provides with an instance of ProducerTemplate which allows to ultimately synthesize on the flight an Exchange and send it to an endpoint, all in a thread safe manner. ProducerTemplate provides a number of methods to build an Exchange.

Conclusions

In this post we just scraped the surface of Camel, and there is a lot more to discover. I will try in future posts to dive further into features of Camel that I find particularly interesting, e.g. the testing support, parallel processing features, integration with Akka.

The way the Camel DSL is structured provides a way to write code in linear manner, making it easier to reason about, and even visualize it (through the DSL and through HawtIO which we will explore later), and it tackles the problems of modularity of an application. For example we could just split any processing flow in subroutes and replace only those when necessary, provided the structure of the Exchange up and downstream is preserverd. This also fits perfectly with the need of distributing work in a team without risking of stepping in each others’ toes. If we consider also that a very natural way of deploying routes is through OSGi bundles (this is the principle followed in JBoss Fuse), we can see how allowing modularity is one of the core principles at the base of this project, and it has definitely been achieved.