Pierangelo Cecchetto

yet another blog about software (and baking)

Getting Started With Apache Camel

| Comments

Any application in an enterprise context, regardless how small this context might be, must relate with other systems. These systems might be File system, databases, webservices, message queues, logging systems, or systems using a particular communication protocol. Moreover, data typically undergo transformations, switching and routing logics before reaching other systems. The variety of combinations this allows is enormous, and tackling each of these in a hand made, custom way might easily become an integration nightmare. Enterprise Integration Patterns (EIP) establish a standard way to describe and identify the different approaches that one can follow to deal with an integration problem (see http://www.enterpriseintegrationpatterns.com). They establish a common vocabulary that can be used unambiguously when talking about integration. If we consider that integration solutions are ubuquitous in application development, we realize easily how convenient it might be to have solid foundations on this subject.

Apache Camel is a framework that implements EIPs through a very expressive DSL, so one can translate almost immediately any EIP to a corresponding expression in the DSL. Moreover, Camel provides an extensible set of components that allows you to deal with basically any system that might come at hand. A key feature of Camel is that it deals with a normalized message format, so after the consumption point the message has a standard format, e.g. it can be handled identically either if it comes from consuming from a JMS queue or from a SOAP or REST webservice.

It is easier to grasp the concepts setting up a simple Camel project and seeing these features at work.

Camel is a Spring-based framework, so the easiest way to use it is to include it in your Spring context. We will use Camel 2.15.1 that provides a way to setup a Camel-based application without the need of xml configuration.

Maven Dependencies

First of all let’s include the dependencies in our pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>

    <!-- camel -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>${camel.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring</artifactId>
        <version>${camel.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring-javaconfig</artifactId>
        <version>${camel.version}</version>
    </dependency>
</dependencies>

camel-core provides the essence of the camel infrastructure and basic components, camel-spring and camel-spring-javaconfig provide the classes and annotations that allow us to configure and run Camel within a Java-configured Spring application.

Embed Camel in a bootable application

The org.apache.camel.spring.javaconfig.CamelConfiguration abstract class can be used as a base Spring configuration class, where we can reference all the beans we might need in the standard Spring way (xml or annotations based). The additional thing that this class does is loading a CamelContext and injecting any bean that extends RouteBuilder available in the Spring context, in the CamelContext. More about the CamelContext will follow along the article. To give ourselves some more flexibility we will also use a standard xml Spring configuration file.

The simplest configuration of such an application boils down to:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * We extend CamelConfiguration so we can build a Camel Context using purely annotated classes
 *
 */
@Configuration
public class Boot extends CamelConfiguration {

    public static void main( String[] args ) throws Exception {
        Main main = new Main();
        //use any route builder and components declared within this package
        main.setFileApplicationContextUri("classpath:/spring/spring.xml");
        main.setBasedPackages("nl.sytac.edu");
        main.run();
    }

    @Bean
    public RouteBuilder getSimpleRouteBuilder() {
        return new SimpleRouteBuilder();
    }
}
Integrating Camel in an existing (web) application

If you want to integrate Camel in an existing application, injecting a CamelContext in an existing Spring application context is pretty straightforward, see e.g. https://github.com/pierangeloc/webshop-camel-springmvc for an example of how to integrate Camel within an existing web application.

Simple Example

Now that we outlined the base structure of a Camel-based application, let’s try to put this at work and build a simple application. The purpose is just to show how we can have a working application with minimum effort. Let us suppose we have a simple ERP application that provides us periodically with an updated stock situation about our e-shop, and we want to provide this information to a third application, e.g. an analytics application. We want to expose a simple REST service, with one URL and supporting only POST method. The aim is to persist the body of our POST call in a file, in a configured location. A file will be created for every request.

Setup Rest Endpoint

The most convenient way to expose or consume a REST or SOAP webservices in Camel is to use the CXF component. With CXF we can use JAX-RS and JAX-WS annotations to configure the service classes.

The first thing we must do is to setup the serving class that describes the resource we want to expose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package io.sytac.edu.rest;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;

/**
 * Service class for our REST endpoint
 */

@Path("/inventory/status")
public class InventoryResource {

    @POST
    @Consumes("application/json")
    @Produces("application/json")
    public Response updateStatus(String status) {
        return null;
    }
}

We provided a trivial implementation of the method, as the real implementation of the logic will be delegated to the route we are about to create. The JAX-RS annotations allow us to specify in a transparent way the supported content types, having the implementation actually enforcing them. To make the endpoint available to our route, the most flexible way is to create a CXF bean that delegates to it, and to set it in our Spring context:

1
2
3
4
5
6
7
8
9
10
11
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cxf="http://camel.apache.org/schema/cxf"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
              http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
              http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

        <cxf:rsServer id="restInventory" address="http://localhost:8000/services"
                serviceClass="io.sytac.edu.rest.InventoryResource" loggingFeatureEnabled="true"/>

</beans>

Here we created a cxf:rsServer bean as we want it to serve requests, and specified the hostname, port, base url. The serviceClass attribute points to the class we defined above. Again, the implementation of this class is just ignored as the real processing of the request is specified in the route, which we are about to explore.

Routing

Routes are specified in Camel by extending the abstract class RouteBuilder, which provides all the Camel DSL goodness. All we have to do is implement the configure() method and specify the route(s). A route has one consumer endpoint, representing the point the processing starts from, and one or more producer endpoints, to represent delegation steps outside the present route, typically using Camel components. In our case we want to consume requests coming to our REST resource, therefore we will specify this circumstance in the from. The cxfrs component is used to expose or consume rest services through CXF. Being used in a from clause means it is acting as a REST server. The transport layer is provided in our case by a Jetty server that we embed in our application, but a standard Servlet transport could be used as well when running within a container.

After the from(...) we specify the EIP that must be applied in our processing logic, and the DSL provides a natural and straightforward way to do so. A simple, dummy implementation of this concept applied to our use case results in this initial version of our route builder, that defines one route that accepts any JSON request and replies with a static JSON response. This allows us to inspect how a Processor works in Camel. All the processing steps in Camel are essentially done through a chain of Processor implementations, each acting on a mutable instance of Exchange (we will get back later to the details of the Exchange). The DSL implicitly introduces processors in a transparent way, however in an explicit Processor we can access directly the structure of the Exchange. To some extent we can do this also through the DSL, and in general using the DSL gives us a more consistent view of the processing, without the need to inspect the details of a specific Processor implementation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .process(
                new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        exchange.getOut().setBody(
                                Response.ok().entity(
                                    "{\n" +
                                    "  \"status\": \"OK\",\n" +
                                    "  \"message\": \"stock update received\"\n" +
                                    "}").build());
                    }
                }
            );
    }
}

If you run the code at this check-point you will see that the server is running and it accepts only JSON POST, replying always with the same message.

What happened exactly along the route?

The cxf component accepted the request, and mapped the body of the request in the body of the In message of the Exchange, and the request headers in equivalent message headers. At this point any processing step deals with the normalized message structure being the Exchange, and can access it and modify being completely agnostic of where it comes from (it could come equivalently from consuming a JMS queue) or where it will go to (file system or DB).

The equivalent version that uses purely DSL constructs looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .setBody(constant("{\n" +
                                "  \"status\": \"OK\",\n" +
                                "  \"message\": \"stock update received\"\n" +
                                "}"))
        ;
    }
}

or even better, using Jackson as a provider for CXF i.e. having the whole serialization being handled behind the scenes:

1
2
3
4
5
6
7
8
9
10
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

This requires only a slight complication on the configuration of the rsServer, to set the provider to a Jackson bean. In any case this pays off in terms of conciseness and clarity.

Code check-point

Persisting the message to a file

After this diversion on the routing logic, let’s focus again on our use case. We want to persist each JSON message we receive through the request body into a file, and we want a new file for each request to be written. The camel File component provides a flexible and straightforward way to do so. We must simply introduce a step that sends the Exchange to the file component, that takes care of all the boilerplate and tricky parts we have to deal with when writing on file system in a concurrent environment.

1
2
3
4
5
6
7
8
9
10
11
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

In general components in Camel are addressed by means of a url where the protocol represents the component we want to use, followed by a base url and a query string to configure and tweak the component itself. In this case we instruct file on how to create the file name, by means of the simple expression language which is very useful in Camel when we have to deal with dynamic content. In this case we want the file name to contain a simple timestamp. If we run this example, we can see that for every request there is a new file created and saved with the name we defined. We didn’t have to bother about serializing/deserializing anything, as Camel did this for us. It is even possible to configure the components to deal properly with streaming situations, e.g. when we have to upload resources.

Code check-point

Persisting the message to a file and to MongoDB

Let us suppose we want to store the incoming JSON on a MongoDB collection instead of file system. Of course Camel provides us with a component for this case, just it requires us to add an extra dependency:

1
2
3
4
5
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-mongodb</artifactId>
    <version>${camel.version}</version>
</dependency>

Then we must just configure a MongoClient bean in our Spring context:

1
2
3
4
5
@Bean(name = "mongoClient")
public Mongo mongoDb() throws UnknownHostException {
    //default port on localhost
    return new MongoClient("localhost", 27017);
}

and then the ceremony is over. All we are left to do is replacing our file component with a mongodb component, configured to use the mongoClient bean above defined, and to use the inventory database, with updates as collection, and save operation (as we are saving our data in the collection).

1
2
3
4
5
6
7
8
9
10
11
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("mongodb:mongoClient?database=inventory&collection=updates&operation=save")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

If we run this code we can see that for every POST to our REST api we have an insertion on our database. Code check-point

We can even decide to store the message both on file system and on Mongo, and the route looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
            .to("file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json")
            .to("mongodb:mongoClient?database=inventory&collection=updates&operation=save")
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

Code check-point

Adding routing logic

In the previous example we have stored the same message both on DB and filesystem. We might think to split the incoming message based on its structure, and store e.g. some jeans related stock information on DB and the shoes related information on filesystem.

For this kind of purposes an easy approach is to introduce a when clause where we can specify jsonpath expressions that our input message must match in order to be treated the way we need. For this exercise we will deal with shoes inventory updates that will be persisted on file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "shoes": [{
                "name": "Nike AIR",
                "description": "Running shoes",
                "DC1": 13,
                "DC2": 23
            },
            {
                "name": "Adidas Ultra Boost",
                "description": "Running shoes",
                "DC1": 7,
                "DC2": 3
            },
            {
                "name": "TOD's Suede",
                "description": "Sneakers",
                "DC1": 3,
                "DC2": 9
            }
    ]
}

and jeans inventory messages that will be persisted on DB:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
    "jeans": [{
                "name": "Levi's 501",
                "description": "Blue jeans",
                "DC1": 32,
                "DC2": 10
            },
            {
                "name": "Diesel",
                "description": "Blue jeans",
                "DC1": 11,
                "DC2": 7
            }
    ]
}

We want also to deal with messages that do not conform to any of these 2 options, saving them in corresponding error files (we could also send them alternatively to a dead letter queue) .

The solution is very straightforward, it just might take a bit if you are new to jsonpath expressions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class SimpleRouteBuilder extends RouteBuilder {

    public static final String FILE_INVENTORY_ENDPOINT = "file:///tmp/inventory?fileName=inventory-${date:now:yyyyMMdd@HHmmssSS}.json";
    public static final String MONGO_INVENTORY_ENDPOINT = "mongodb:mongoClient?database=inventory&collection=updates&operation=save";
    public static final String FILE_ERROR_ENDPOINT = "file:///tmp/inventory?fileName=error-${date:now:yyyyMMdd@HHmmssSS}.json";

    @Override
    public void configure() throws Exception {
        from("cxfrs:bean:restInventory")
            .routeId("restInventoryRoute")
                .choice()
                    .when().jsonpath("$[?(@.shoes)]")
                        .to(FILE_INVENTORY_ENDPOINT)
                    .when().jsonpath("$[?(@.jeans)]")
                        .to(MONGO_INVENTORY_ENDPOINT)
                    .otherwise()
                        .to(FILE_ERROR_ENDPOINT)
                .end()
            .setBody(constant(new Response("Ok", "stock update received")));
    }
}

The choice clause allows us to specify any predicate or expression we might want to evaluate on the incoming Exchange, in this case as we know we are dealing with JSON body so we make direct use of the jsonpath construct. Camel Expression language is very rich and it might be subject of a future post.

Code check-point

At this point I think the concept about Camel routing is pretty clear, and you can appreciate how easy it is to make use of the components and of the routing logic. Just looking at the DSL we can understand and reason about the processing steps our data go through. In a future post we will explore further some of the DSL routing features, e.g. how to handle errors or perform testing.

I will close this article with an overview of the structural foundations of Camel and recurring concepts that will appear any time you have to deal with it.

Camel foundations

Camel is built around some key concepts that we will try to explain in this section: CamelContext, RouteBuilder, Endpoint Exchange, Message, ProducerTemplate

CamelContext

The foundation on top of which all the Camel machinery runs is the CamelContext, which is in control of the lifecycle of the routes and messages running on it. A CamelContext can be started/suspended/resumed/stopped, and it provides a number of configuration tweaks, e.g. to handle the underlying ExecutorService. A CamelContext can manage many routes, which represent processing paths for messages.

RouteBuilder

A Route is a description of the processing steps a message can encounter. A route starts from a consumer endpoint that consumes messages and processes them, and can end to a producer endpoint if the messages must result in an outcome of some type (e.g. a call to a webservice, or a message on a JMS queue). The standard way to create a Route in Camel is to inject a RouteBuilder in the CamelContext and override its define() method. The RouteBuilder class provides the Camel DSL which makes the implementation of EIP straightforward. We will see later a few examples of routes.

Endpoint

A Route could not process messages without a source of these messages, which is a consumer endpoint, and without a destination to which messages can be delivered after being processed, a producer endpoint. The concept of endpoint is itself an EIP http://www.enterpriseintegrationpatterns.com/MessageEndpoint.html and it is a cornerstone of the Camel solution to the integration problem: regardless of the nature of the producer or consumer endpoint, the intermediate steps deal with a message that have a standard structure and is in general agnostic about the nature of the source or destination of the message. The processing steps along the route deal with the same type of message, whether it came to the route through a webservice exposed by the route, or because it has been delivered as a file on a directory the route is listening to. The webservice, or the file listening service in this case, are different implementations of a consuming endpoint.

Exchange

Endpoints can produce or consume Exchanges. An Exchange is structured in an IN and an OUT message. When dealing with a consumer endpoint, we can see it as a source of Exchanges with IN message. The processing steps along the route can modify the IN message and ultimately populate the OUT message that is used by the consumer endpoint as a response (imagine the IN/OUT pattern as a mapping of the Request/Response pattern)

Message

The IN/OUT Messages in the exchange have a standard structure, being constituted of header, body and attachments. The processing steps can read or modify freely each of these parts, being all these mutable structures.

ProducerTemplate

In testing scenarios, or in situations where messages are not consumed from a Camel endpoint, you might want to create an Exchange and send it to an endpoint. This can happen e.g. when you are integrating Camel in an existing web application, and you want to use it to handle only one part of the request/response cycle, delegating the final handling to the existing framework (e.g. to a Spring Controller). Moreover, you want to be completely sure of not ending up in thread safety problems. For these cases the CamelContext provides with an instance of ProducerTemplate which allows to ultimately synthesize on the flight an Exchange and send it to an endpoint, all in a thread safe manner. ProducerTemplate provides a number of methods to build an Exchange.

Conclusions

In this post we just scraped the surface of Camel, and there is a lot more to discover. I will try in future posts to dive further into features of Camel that I find particularly interesting, e.g. the testing support, parallel processing features, integration with Akka.

The way the Camel DSL is structured provides a way to write code in linear manner, making it easier to reason about, and even visualize it (through the DSL and through HawtIO which we will explore later), and it tackles the problems of modularity of an application. For example we could just split any processing flow in subroutes and replace only those when necessary, provided the structure of the Exchange up and downstream is preserverd. This also fits perfectly with the need of distributing work in a team without risking of stepping in each others’ toes. If we consider also that a very natural way of deploying routes is through OSGi bundles (this is the principle followed in JBoss Fuse), we can see how allowing modularity is one of the core principles at the base of this project, and it has definitely been achieved.

Scala.js for Dynamic Systems Simulation

| Comments

After the Scaladays 2015 in SF, a lot of echo has involved the Scala.js framework. Scala.js compiles Scala code into Javascript code, which can be run in a browser but also on Javascript powered server environments e.g. Node.js.

Scala.js is not something like GWT, which provided a whole toolkit for Javascript code generation from Java. Scala.js is simply a compiler that translates Scala code into Javascript code. This allows to use all the great features that make Scala such a successful language, like strong typing, functional code, higher order functions, case classes, pattern matching, and even Future and Promises (when dealing e.g. with events on the client or with Ajax calls). For some parts it is possible to develop code that is fully agnostic of the target platform, i.e. you can write some Scala code that can be executed equivalently on a JVM or on a browser when compiled with Scala.js. For the parts that deal with HTML elements or with Ajax, the target platform must be taken into consideration. The advantage that Scala.js offers is, on top of the pleasure of using a concise and expressive language like Scala, that due to the strong typing it allows to discover errors already in the compilation phase, while as we know when we develop logic in Javascript, most of the times we discover errors only at runtime.

To show all the goodness of Scala.js I could have written a simple game, but there are already plenty of these examples around. Being this the blog of a real nerd, in this post we will use Scala.js to build a simulator of dynamic systems running on a browser, with the aim to draw a phase portrait of a system described by a 2-dimensional ODE. The source code used in along this post can be found here.

SBT project setup

For a full introduction to Scala.js please refer to the great guide http://lihaoyi.github.io/hands-on-scala-js/. The layout of my project follows exactly the structure outlined in that guide. To setup an SBT project to use Scala.js you simply must add the corresponding plugin in the plugins.sbt file. I added also the Workbench plugin which proved to be very useful to test your code on the browser:

1
2
3
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.1")

addSbtPlugin("com.lihaoyi" % "workbench" % "0.2.3")

In the build.sbt file we must enable the ScalaJSPlugin and add the dependency from the scalajs-dom library that allows us to access the DOM elements (and events) from Scala, in a typesafe fashion.

1
2
3
4
5
6
7
8
9
enablePlugins(ScalaJSPlugin)

libraryDependencies ++= Seq(
  "org.scala-js" %%% "scalajs-dom" % "0.8.0"
)

bootSnippet := "drawing.ScalaJSPhasePortrait().main(document.getElementById('canvas'));"

updateBrowsers <<= updateBrowsers.triggeredBy(fastOptJS in Compile)

The bootSnippet line is used just to configure Workbench to execute that Javascript to start the application on the browser, and the updateBrowsers line is used to trigger the boot at the end of each compilation phase.

Coding

In the project folder /src/main/scala we put our Scala code that will be then compiled into equivalent Javascript. We need to define one or more exposed points, i.e. a Scala Object that we want to be translated in Javascript, on which we can invoke one or more methods. Both the Object and the methods must be marked properly for export, by means of the @JSExport annotation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package drawing

// ...

@JSExport
object ScalaJSPhasePortrait {

  @JSExport
  def realPendulumPhasePortrait(canvas: Canvas): Unit = // ...

  @JSExport
  def simplePendulumPhasePortrait(canvas: Canvas): Unit = // ...

  @JSExport
  def lotkaVolterraPhasePortrait(canvas: Canvas): Unit = // ...

Here we are exposing the drawing.ScalaJSPhasePortrait object and 3 methods realPendulumPhasePortrait, simplePendulumPhasePortrait and lotkaVolterraPhasePortrait, each of them expecting a Canvas object as input. The compiler will start from these exposed entities and transitively include everything is strictly required by the code in order to function. This means that if a method uses some features provided by another class, possibly provided by a separate library, only the required features are included in the translation package and not the whole library. There is no limit to the number of objects or functions you might want to expose in your application.

When we run

1
 sbt ~fastOptJS

we trigger the code generation (the ~ is a standard SBT feature that allows code to be regenerated whenever something changes in the source files). If we inspect the /target folder in the project we can see that a file target/scala-2.11/phaseportraitjs-fastopt.js has been created. This is the output of the Scala.js compiler. The name of this file is determined by the SBT project name, and this is the file that must be included in our HTML page in order to have our example working. We can then invoke the drawing.ScalaJSPhasePortrait object methods from any point in our browser.

1
2
3
4
5
6
7
<script type="text/javascript" src="../phaseportraitjs-fastopt.js"></script>
<script type="text/javascript" src="/workbench.js"></script>
<script>
    drawing.ScalaJSPhasePortrait().simplePendulumPhasePortrait(document.getElementById('simplePendulum'));
    drawing.ScalaJSPhasePortrait().realPendulumPhasePortrait(document.getElementById('realPendulum'));
    drawing.ScalaJSPhasePortrait().lotkaVolterraPhasePortrait(document.getElementById('lotkaVolterra'));
</script>

The problem

An autonomous dynamical system in $\mathbb{R}^n$ is described by a system of differential equations that can be represented as: \[ \dot x = f(x),\ x \in \mathbb{R}^n \] Being the system autonomous means that the field described by the function $f$ does not depend from time, and this means that the evolution of the movement $x(t)$ given some initial condition $x_0$ depends only from $x_0$ and not from the particular initial time (there is an invariance on time shift). This allows us to provide a good description of the evolution of the system by describing the orbits of the sytem, i.e. the values that $x(t)$ assumes regardless of the time. This type of qualitative analysis is important when describing systems for which we don’t know an exact solution.

In our Scala.js application we want to provide a general solution for this problem, for systems of dimension 2, so that their evolution can be easily represented on a 2-dimensional space, in our case an HTML Canvas.

We need to separate the concerns of drawing something on a given rectangle from those of calculating the evolution of a dynamic system.

Drawing points on a rectangular area

The starting point is the HTML Canvas that has a given size, and the window of $\mathbb{R}^2$ we want to represent on it. For convenience we allow the window center and sizes to assume only integer values. The window is a simple case class:

1
2
3
4
case class DrawingWindow(canvas: Canvas, centerX: Int, centerY: Int, halfWidth: Int, halfHeight: Int) {
  val deltaX = halfWidth * 2.0 / canvas.width
  val deltaY = halfHeight * 2.0 / canvas.height
}

The deltaX and deltaY values represent the resolution of our window on the given canvas. The Plot class provides all the methods we need to deal with the window on the canvas and to plot dots and curves on it.

The mathematical points are represented by the case class Point, on which we defined the standard operation on the vector space $\mathbb{R}^2$:

1
2
3
4
5
6
case class Point (x: Double, y: Double) {
  def +(other: Point) = Point(this.x + other.x, this.y + other.y)
  def -(other: Point) = Point(this.x - other.x, this.y - other.y)
  def *(factor: Double) = Point(this.x * factor, this.y * factor)
  def abs = Math.sqrt(x * x + y * y)
}

We can appreciate how easily Scala case classes and operators allow us to represent algebraic entities and operators like we are used to do in Algebra. Doing the same thing in Java would have taken many more lines of code and would have lacked expressiveness. The transformation from a Point to a CanvasPoint is performed by the point2CanvasPoint(p: Point): CanvasPoint function in the Plot class. A drawGrid() function uses for comprehensions to draw the grid on the window:

alt text

To draw points on the canvas we simply draw little circles around each point, you can check the source code for the details.

Calculating the evolution of the system from initial conditions

In order to plot the phase portrait we begin defining a set of initial points to start the evolution from. In our case we pick the points on the grid with integer coordinates. Once we have this initial condition $x_0$ we must simulate the evolution $x(t)$ that solves our differential equation starting from $x_0$: $x(0) = x_0$. If we do this repeatedly for each initial condition in our grid, we end up with a phase portrait.

To calculate the evolution we use the 4th order Runge-Kutta algorithm (see here for the details). We sample the solution in regular time intervals $0,\ \Delta t,\ 2\Delta t,\ $ etc. The sequence of values of $x(t)$ is represented by a Stream that we build through the iterate function needs the head of the Stream and the function to produce the next sample given the current one, on demand, so it fits perfectly with our needs. If we represent our function $f$ as f: Point => Point, the whole evolution machinery is represented by this function (we translated almost verbatim the mathematical formula of the Runge-Kutta method):

1
2
3
4
5
6
7
8
9
10
11
//evolution of a dynamic system in R2 (RK 4th order)
def evolution(f: Point => Point, x0: Point, deltaT: Double = 0.05): Stream[(Double, Point)] = Stream.iterate((0.0, x0)) {
  case (tn: Double, xn: Point) => {
    val an = f(xn)
    val bn = f(xn + (an * (deltaT / 2)))
    val cn = f(xn + (bn * (deltaT / 2)))
    val dn = f(xn + (cn * (deltaT)))

    (tn + deltaT, xn + (an + bn * 2 + cn * 2 + dn) * (deltaT / 6))
  }
}

Given the function f, the initial point x0 and the sampling interval deltaT the evolution function provides us with a stream of pairs (tn, xn) that represent the evolution of the system.

If we take a given number of samples and we plot them, we have one orbit for a given initial condition. If we do that for all the points with integer coordinates in our grid, we have a complete phase portrait for the system described by the function f.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 def phasePortrait(f: Point => Point) = {
      val verticalLines = (drawingWindow.centerX - drawingWindow.halfWidth) to (drawingWindow.centerX + drawingWindow.halfWidth)
      val horizontalLines = (drawingWindow.centerY - drawingWindow.halfHeight) to (drawingWindow.centerY + drawingWindow.halfHeight)

      for {
        x <- verticalLines
        y <- horizontalLines
      } {
          println(s"x0 = ${Point(x, y)}")
          //rgb to plot the orbit with 1 color
          val rgb = s"rgb(${Random.nextInt(256)},${Random.nextInt(256)},${Random.nextInt(256)})"

          evolution(f, Point(x, y)).map {
            case (t, p) => {
              plotter.draw(p, rgb)
              p
            }
          } take(3000) toList
        }
    }

You can see that we used here the Scala println function. The code generated in ScalaJS has the very convenient feature that all these instructions are translated into browser console logs.

Now we can define the system we want to simulate simply by defining the corresponding function $f$:

1
2
3
4
5
6
7
8
9
val omega = 1.0

def simplePendulum = (x: Point) => Point(x.y, - omega * omega * x.x)

def realPendulum = (x: Point) => Point(x.y, - omega * omega * Math.sin(x.x))

def lotkaVolterra = (x: Point) => Point(x.x - x.x * x.y, x.x * x.y - x.y)

def vanDerPol = (x: Point) => Point(x.y, 2 * (1 - x.x * x.x) * x.y - x.x)

and pass this function to our phasePortrait that takes care of plotting the corresponding phase portrait in the given Canvas. Here are a couple of examples:

Pendulum alt text

We can clearly identify the single orbits (from the given color). The further we move from the horizontal axis, the higher is the speed and therefore our samples are getting more sparse. An adaptive sampling algorithm would have helped us having a better drawing, but the essence of the implementation does not change.

Van der Pol alt text

We can see that in this system all the orbits end up overlapping with a given, globally attracting, orbit. This is a clear phenomenom of limit cycle, examples of which we have also in the discrete world, e.g. in digital signal processing.

Conclusion

We have seen how Scala.js is a convenient way to build code running on any Javascript powered platform, using the great features of the Scala language.

It is also a nice way to translate Scala code, which tends to be very generic and applicable to different types, into visual components easily accessible to anyone and not only to people with technical skills.

The evolution function provided above can be used to simulate any system of dimension 2, and easily adapted to simulate any dynamic system described by a differential equation, even in the time-dependent case (it is sufficient to increase the dimension of one and obtain an equivalent autonomous system).

The Plot class can be reused to plot any kind of curve on a given window on the Cartesian plane.

A Simple Scala Application: Calculating Integrals

| Comments

I embraced functional programming through Scala starting with the great Marting Odersky’s course in Coursera, following with the Reactive Programming course in Coursera (a second edition of which will start shortly and I really recommend you to sign up and follow it), and later on working with Play framework and Akka streams.

I spent a few years programming in Java in a purely imperative way, and as soon as I could understand the functional approach, I realized it is a great way to focus on the essence of the problem, and it provides a more organic way to decompose reasoning and handling complexity.

Of course the language itself doesn’t guarantee that the approach is functional: if you want you can write Scala almost like Java without semicolons. But despite not being purely functional and allowing mutable variables, Scala provides all the features to put in practice purely functional approach. Moreover the syntax is very similar to Java so the transition from Java, from the language point of view, is pretty smooth.

When I have seen the first examples of functional code I appreciated the fact that it provides the language tools to model directly mathematical functions. For example a mathematical function $f$ from $A$ to $B$:

\[ f: A \rightarrow B \]

is translated into a definition of a function in Scala as (the ??? mean that the function is still undefined):

1
2
scala> def f: Double => Double = ???
f: Double => Double

Functions in Scala are first class citizens, and they can be passed as arguments to other functions. This allows to compose things in a very straightforward way that procedural programming simply wouldn’t allow.

Consider for example calculating the integral of the $\sin(x)$ function over the interval $[0, \pi]$, which we know from our math classes being 2. We map real numbers to their finite precision representation as Double, and we can have a Scala definition of $\sin(x), x \in \mathbb{R}$ as
1
def f: Double => Double = (x: Double) => Math.sin(x)
or even more concisely
1
2
3
4
scala> import Math.sin
import Math.sin
scala>def f: Double => Double = sin(_)
f: Double => Double
Now we can pproximate the integral with a finite sum: \[ \int_0^{\pi}\sin(x)dx \approx \Delta x \sum_{n=0}^{N}\sin(x_n) \] with $\Delta x = \pi / N$ and $x_n = n\Delta x$.

We know the explicit way of finding N given $\Delta x$ and $\pi$, however let’s exploit a great feature of Scala which are Streams and lazy evaluation (see http://www.scala-lang.org/api/2.11.5/index.html#scala.collection.immutable.Stream). We can define the Stream of $x_n$ as a potentially unbounded sequence of doubles, with the guarantee that the next number will be generated on demand, lazily, only when required:

1
2
3
4
scala> val deltaX = 0.01
deltaX: Double = 0.01
scala> def xn = Stream.iterate(0.0)(_ + deltaX)
xn: scala.collection.immutable.Stream[Double]

Here we did two things: we created a Stream, where no value is allocated yet, all the values of which are created on demand, and we defined it with a def and not with a val in order to save memory allocation once we consumed the stream, as we want just the $x_n$ to be produced to generate the sum and we don’t want to retain it afterwards.

The stream xn defined above is potentially unbound, it represents all the sequence of xn obtained as 0, deltaX, deltaX + deltaX, deltaX + deltaX + deltaX, ... but we can set easily an upper limit in this way

1
2
scala> def xnPi = xn.takeWhile(_ < Math.PI)
xnPi: scala.collection.immutable.Stream[Double]

We can now take the values of the f function in each of these values simply by mapping the Stream of xn through the function:

1
2
scala> def ynPi = xnPi.map(f)
ynPi: scala.collection.immutable.Stream[Double]

In the context of Streams we can see the map application as a transformation from the sequence of $x_n$ to the sequence of $f_n := f(x_n)$. This interpretation of the map function applies only to sequence-like types such as Streams and Lists, but it has a very different interpretation in other types, where concepts coming from Category Theory come into place, and lead to the ideas of Monoids and Monads.

Now the only thing left to do is add up the ynPi and multiply them by $\Delta x$, and this can easily be achieved through a foldLeft application:

1
2
scala> deltaX * ynPi.foldLeft(0.0)(_ + _)
res1: Double = 1.9999900283082575

So we obtained quite simply, mapping directly the theoretical concepts down to language constructs, in 5 lines of code, a value pretty close to the real one. Consider how much verbosity and focus on the implementation details it would have taken having this implemented in Java or in any case in a procedural way. With Scala and in general with a functional approach we just focus on what we want to achieve, through which transformations, instead of focusing on the processing details. In this way we produce code that is more isolated, testable, and appliable in different contexts.

Parallelizing Costs

| Comments

Why learning concurrency

Studying for the OCP exam offers a nice opportunity to dive into one of the most important (and sometimes overlooked) aspects of the Java language and JVM: threads and concurrency frameworks. Before I never delved into the details about concurrency just because I have been working on a container (like Tomcat) or on a framework (like Apache Camel) that takes care of distributing load on a thread pool, and basically every request or every message processing can in 99% of cases be treated as a synchronous process, so I survived for many years without having a good understanding of how concurrency works in Java.

Learning how threads and concurrency works is important especially if we look at how hardware is evolving. Processors speed is not increasing any more (due to physical limitations) but processors are getting large, i.e. the number of cores is increasing, and we must be able to make best use of them.

Threads in Java

When we talk about threads a distinction must be made between a thread as object and a thread of execution. Java has supported threads since the very beginning in a native way. As everything in Java is handled as an object, threads are not an exception, and a thread is represented by the java.lang.Thread class. A thread of execution instead represents a call stack that is being executed on the JVM. You can visualize multiple threads running as many call stacks running independently. However threads can possibly work on the same objects in the Heap, and there are a number of mechanisms provided within the language to allow handling in a safe way these shared objects (synchronized blocks and methods, locks, read/write locks, atomic variables). I used to think myself that parallelizing things could lead automatically to better performance, but after diving a bit into the subject, I came across a great presentation from Martin Thompson that I really recommend. This presentation is great for a couple of reasons, first it makes it clear that having knowledge about the general structure of the machine on which our programs run is paramount to have efficient code. We don’t need to know all the details as if we are developing firmware code for a particular processor, but a general understanding of how modern multicore CPUs work is recommended (mechanical sympathy). Secondly, it conveys the important concept that having threads contending a certain resource and swapping context take time, and it might be not always convenient to parallelize. So parallelize with a grain of salt.

Benchmark

To convince myself of the fact that parallelization is not necessarily good, I used the same example suggested in the Disruptor presentation, a very trivial problem which is incrementing an int variable 10.000.000 times. The aim is to compare the performance of these approaches in dealing with this problem. Let’s first outline the different approaches used, then we will go through the performance results. The code can be found here

Single threaded, with a while loop

This is the most trivial approach:

1
2
3
while (i < times) {
    i++;
}

The while loop is executed on one single thread.

Single threaded, with a lock/unlock within the loop

To measure the price of a simple lock and unlock, without any other thread involved in it (no actual contention), I used this second strategy:

1
2
3
4
5
while (i < times) {
    lock.lock();
    i++;
    lock.unlock();
}
Single threaded, with synchronized block within the loop

An equivalent approach to the previous one is to synchronize the whole block in the while loop

1
2
3
4
5
while (i < times) {
    synchronized (lock) {
        i++;
    }
}

Then we can start to really parallelize things and see how it goes. I tested this on an Intel i7 (8 cores) machine, using 4 threads

4 threads, without locking nor synchronizing (inconsistent results)

We define a list of Callables that share the same integer holder, increasing it concurrently

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * MultiThreadedUnlockedStrategy.java
*/
IntHolder intHolder = new IntHolder();
List<IntIncrementer> incrementers = new LinkedList<>();
for(int i = 0; i < times; i++) {
    incrementers.add(new IntIncrementer(intHolder));
}

executorService.invokeAll(incrementers);

/**
 * IntIncrementer.java
*/
@Override
public IntHolder call() throws Exception {
    LOGGER.debug("incrementing");
    holder.value++;
    return holder;
}

As expected, this strategy leads to incorrect and inconsistent results, e.g. 9978258

4 threads, with locking/unlocking in each incrementation

The incrementation block is synchronized on the holder object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * MultiThreadedLockedStrategy.java
*/
IntHolder intHolder = new IntHolder();
List<LockingIntIncrementer> incrementers = new LinkedList<>();
for(int i = 0; i < times; i++) {
    incrementers.add(new LockingIntIncrementer(intHolder));
}

executorService.invokeAll(incrementers);

/**
 * LockingIntIncrementer.java
 */
@Override
public IntHolder call() throws Exception {
    synchronized (holder) {
        LOGGER.debug("incrementing");
        holder.value++;
    }
    return holder;
}

This leads to consistent and correct results.

4 threads, with AtomicInteger

The incrementation block is synchronized on the holder object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * MultiThreadedLockedStrategy.java
*/
AtomicInteger atomicInteger = new AtomicInteger();
List<AtomicIntIncrementer> incrementers = new LinkedList<>();
for(int i = 0; i < times; i++) {
    incrementers.add(new AtomicIntIncrementer(atomicInteger));
}

executorService.invokeAll(incrementers);


/**
 * AtomicIntIncrementer.java
 */
private AtomicInteger atomicInteger;

@Override
public Integer call() throws Exception {
    LOGGER.debug("incrementing");
    return atomicInteger.incrementAndGet();
}

This strategy also leads to correct results.

4 threads, with ForkJoin

The approach followed in this case has been totally different. We have constructed an array of 10.000.000 ones, and added the values through a fork/join.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * MultiThreadedLockedStrategy.java
*/
int[] ones = new int[times];
AddingTask mainTask = new AddingTask(ones, 0, ones.length);
int result = forkJoinPool.invoke(mainTask);

/**
 * AddingTask.java
 */
public AddingTask(int[] array, int from, int to) {
        this.array = array;
        this.from = from;
        this.to = to;
    }

@Override
protected Integer compute() {
    if(to - from <= THRESHOLD) {
        return to - from;
    } else {
        int mid = (to + from) / 2;
        AddingTask left = new AddingTask(array, from, mid);
        left.fork();
        AddingTask right = new AddingTask(array, mid, to);
        int rightComputationResult = right.compute();
        int leftComputationResult = left.join();
        return leftComputationResult + rightComputationResult;
    }
}

This strategy leads to correct results.

Performance analysys

The following table sums up the results from the various strategies, and compares their performances

1
2
3
4
5
6
7
8
9
10
11
+------------------------------------+----------------+---------------------+
| Strategy                           | Correct result | Execution time [ms] |
|------------------------------------|----------------|---------------------|
| Single Threaded Lock Free          | Yes            | 3                   |
| Single Threaded with lock          | Yes            | 197                 |
| Single Threaded with syncchronized | Yes            | 237                 |
| 4 Threads without lock             | No             | 25710               |
| 4 Threads with lock                | Yes            | 10498               |
| 4 Threads with AtomicInteger       | Yes            | 11121               |
| 4 Threads with Fork/Join           | Yes            | 1045                |
+------------------------------------+----------------+---------------------+

The problem we tried to solve here was trivial, but from this table we can se that the easiest approach is also the most efficient.

We can also see that just introducing a locking mechanism in a single thread, so without any contention of the lock itself, makes the application about 70 times slower.

When we start working with 4 threads sharing an object, we can see that without locking the results are inconsistent, but in any case the approach is extremely inefficient. Threads must share this object so there is a continuous context switch and update between the cores on which the threads are running.

When we introduce the locking mechanism, results are correct and paradoxically the execution time got reduced.

With AtomicInteger I was expecting a better performance than with the locking mechanism, but actually it got worse. This proves that when there is high contention on an object, also AtomicInteger is not a panacea.

Among the parallelized approaches, the one that proved to work best is the Fork/Join, but in order to apply it we had to change the approach and get to adding the elements of an array.

Some Notes About Java Certification Exam

| Comments

With this post I am starting my blog, and I dedicate it (like probably the next few ones) to topics I am encountering during the preparation of the Java Professional Certification exam. Even if you probably have been using Java for many years, Java certification offers an opportunity to discover details about the language and its features. In the following notes I am collecting a few points that I found at least not obvious, either because I never had the chance to look into them before, or because they showed some peculiar behavior worth keeping in mind. The following notes refer to Java 7.

Precise rethrow

This is a feature that has been introduced in the Java 7 compiler, and it can be expressed in this way:
When rethrowing an exception, the rethrown type doesn’t have necessarily to be of the same type declared in the catch block. If e.g. the try block can throw exceptions of type E1, E2, E3 all subtypes of E which is the type in the catch clause, the rethrowing action is aware of being restricted to types E1, E2, E3 and not necessarily to E.
This can be easily better explained through an example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PreciseRethrow {

    public void rethrowException(String exceptionOrdinal) throws ExceptionB, ExceptionA {
        try {
            if(exceptionOrdinal.equals("first"))
                throw new ExceptionA();
            else
                throw new ExceptionB();
        } catch (Exception e) {
            e.printStackTrace();
            Exception f = e;
            throw f;
        }
    }

}

class ExceptionA extends Exception {}
class ExceptionB extends Exception {}

The compiler analyzes the code in the try clause and detects that only FirstException or SecondException can be thrown, therefore despite the catch block refers to a generic Exception class, it is aware that the actual exception being rethrown in the throw e; can either be of type FirstException or SecondException, therefore the rethrowException() method can declare throws ExceptionB, ExceptionA instead of throws Exception.
If the exception being rethrown is copied to a variable local to the catch block which is in turn rethrown, like in this version of the catch block

1
2
3
4
5
catch (Exception e) {
            e.printStackTrace();
            Exception f = e;
            throw f;
        }

this nice behavior of the compiler ceases to occur, so it will produce a compilation error.