CometD 2 Clustering with Oort

CometD 2 Scalability Clustering with Oort

The CometD distribution ships a clustering solution called Oort that enhances the scalability of a CometD-based system.
Instead of connecting to a single node (usually represented by a virtual or physical host), clients connect to multiple nodes so that the processing power needed to cope with the load is spread among multiple nodes, giving the whole system more scalability then using a single node.

Oort clustering is not a high availability clustering solution: if one of the nodes crashes, then all the clients will be disconnected and will reconnect to other nodes (with a new CometD handshake).
All the information built by one client with its server up to that point (for example, the state of an online chess game) is generally lost (unless - of course - the application has implemented some other way to retrieve that information).

Typical Infrastructure

A typical, but not the only, infrastructure to setup a Oort cluster is to have a load balancer in front of Oort nodes, so that clients can connect transparently to any node.
The load balancer should implement stickyness, and this may be based on the remote IP address or on CometD's BAYEUX_BROWSER cookie (see the Bayeux Specification, section 8.1), or could be based on some other mechanism supported by the load balancer.
DNS should be configured with a single host name / IP address pair (the one of the load balancer), so that in case of a node crash, clients will attempt to reconnect to the same host name, but the load balancer will notice that the node is crashed and direct the connection to another node. The second node will not know about this client, and upon receiving the connect request will send to the client the advice to handshake.

oort infrastructure setup

Terminology

In the following sections, the following terminology will be used: an Oort cluster is also referred to as "Oort cloud" (and therefore "cloud" is a synonym for "cluster"), and a Oort node is also referred to as "Oort comet" (and therefore "comet" is a synonym for "node").

CometD 2 Oort Cloud

CometD 2 Oort Cloud

Any CometD server can become an Oort comet by configuring an instance of org.cometd.oort.Oort.
The org.cometd.oort.Oort instance is associated to the org.cometd.bayeux.server.BayeuxServer instance, and there can be one Oort instance for each BayeuxServer instance.

Oort comets need to know each other's URL in order to connect together and form a cloud.
From CometD 2.3.0 onwards, an automatic discovery mechanism based on multicast is available; for CometD pre 2.3.0 only a static configuration mechanism exists.

Common Configuration

For both static and automatic discovery there exist a set of parameters can be used to configure the Oort instance.
The following is the list of common configuration parameters shared by both the automatic discovery and static configuration servlets:

Parameter Name Mandatory Default Value Parameter Description
oort.url Y N/A The unique URL of the Bayeux server associated to the Oort comet
oort.secret N random string The pre-shared secret used to authenticate connections from other Oort comets
oort.channels N empty string A comma separated list of channels to observe at startup
clientDebug N false Whether to enable debug logging in the OortComet instances

 

Automatic Discovery Configuration

Configuration of the automatic discovery mechanism can be done either via code, or by configuring a org.cometd.oort.OortMulticastConfigServlet in web.xml, for example:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>cometd</servlet-name>
        <servlet-class>org.cometd.server.CometdServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>cometd</servlet-name>
        <url-pattern>/cometd/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>oort</servlet-name>
        <servlet-class>org.cometd.oort.OortMulticastConfigServlet</servlet-class>
        <load-on-startup>2</load-on-startup>
        <init-param>
            <param-name>oort.url</param-name>
            <param-value>http://host:port/context/cometd</param-value>
        </init-param>
    </servlet>
</web-app>

Since Oort depends on BayeuxServer, the load-on-startup parameter of the OortMulticastConfigServlet must be greater than the one of the CometdServlet.

The mandatory oort.url init parameter must identify the URL at which this Oort comet can be contacted, and it must be the URL served by the CometdServlet of this node.
This URL will be sent to other Oort comets, so it is important that the host part of the URL does not point to "localhost" but to a resolvable host name or to an IP address, so that other comets in the cluster can contact this comet.
Likewise, the context path part of the URL must be configured properly for this web application.

In addition to the common configuration init parameters, OortMulticastConfigServlet supports the configuration of these additional init parameters:

Parameter Name Mandatory Default Value Parameter Description
oort.multicast.bindAddress N the wildcard address The bind address of the MulticastSocket that receives the advertisements
oort.multicast.groupAddress N 239.255.0.1 The multicast group address to join to receive the advertisements
oort.multicast.groupPort N 5577 The port over which advertisements are sent and received
oort.multicast.timeToLive N 1 The time to live of advertisement packets (1 = same subnet, 32 = same site, 255 = global)
oort.multicast.advertiseInterval N 1000 The interval in milliseconds at which advertisements are sent

 

Each comet that is configured with automatic discovery will emit an advertisement (containing the comet URL) every oort.multicast.advertiseInterval milliseconds on the specified multicast address and port (oort.multicast.groupAddress and oort.multicast.groupPort) with the specified time-to-live (oort.multicast.timeToLive).
Advertisements are emitted until the web application is stopped, and only serve to advertise that a new node has appeared. Oort has a built-in mechanism that takes care of membership organization (see below for details).

When enabling the Oort automatic discovery mechanism, you must be sure that:

  • Multicast is enabled in the operative system of your choice
  • The network interfaces have multicast enabled
  • Multicast traffic routing is properly configured

Linux is normally compiled with multicast support in the most common distributions, and network interfaces can be controlled with the ifconfig command to check if they have multicast enabled.
Multicast routing can be checked with the command route -n, and the output should contain a line similar to:

Destination    Gateway    Genmask      Flags    Metric    Ref    Use    Iface
224.0.0.0      0.0.0.0    240.0.0.0      U         0       0      0     eth0

You may also want to force the JVM to prefer an IPv4 stack by setting the system property -Djava.net.preferIPv4Stack=true to facilitate multicast networking.

Static Discovery Configuration

The static discovery mechanism can be used if multicast is not available on the system where CometD is deployed to, or when using CometD pre 2.3.0.

It is more cumbersome to setup and does not allow dynamic discovery of new nodes; where possible, the automatic discovery mechanism should be used instead.

The static discovery configuration can be done either via code, or by configuring an org.cometd.oort.OortStaticConfigServlet (or org.cometd.oort.OortServlet for CometD pre 2.3.0) in web.xml, for example:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>cometd</servlet-name>
        <servlet-class>org.cometd.server.CometdServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>cometd</servlet-name>
        <url-pattern>/cometd/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>oort</servlet-name>
        <servlet-class>org.cometd.oort.OortStaticConfigServlet</servlet-class>
        <load-on-startup>2</load-on-startup>
        <init-param>
            <param-name>oort.url</param-name>
            <param-value>http://host:port/context/cometd</param-value>
        </init-param>
    </servlet>
</web-app>

Like for the automatic discovery, the load-on-startup parameter of the OortStaticConfigServlet must be greater than the one of the CometdServlet.

OortStaticConfigServlet supports the common init parameters listed in the previous section, and the following additional init parameters:

Parameter Name Mandatory Default Value Parameter Description
oort.cloud N empty string A comma separated list of URLs of other Oort comets to connect to at startup

 

Configured in this way, the Oort comet is ready to be part of the Oort cloud, but it's not part of the could yet, since it does not know the URLs of other comets (and there is no automatic discovery).
In order to make the Oort comet part of the Oort cloud, you can configure the oort.cloud init parameter of the OortStaticConfigServlet with a comma separated list of other Oort comet URLs to connect to:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>cometd</servlet-name>
        <servlet-class>org.cometd.server.CometdServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>cometd</servlet-name>
        <url-pattern>/cometd/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>oort</servlet-name>
        <servlet-class>org.cometd.oort.OortStaticConfigServlet</servlet-class>
        <load-on-startup>2</load-on-startup>
        <init-param>
            <param-name>oort.url</param-name>
            <param-value>http://host1:port/context/cometd</param-value>
        </init-param>
        <init-param>
            <param-name>oort.cloud</param-name>
            <param-value>http://host2:port/context/cometd,http://host3:port/context/cometd</param-value>
        </init-param>
    </servlet>
</web-app>

Alternatively, it's possible to write custom initialization code (see the section on service integration for suggestions on how to do it) that links the node to the Oort cloud (this may be useful if Oort comet URLs cannot be know a priori, but may be known at runtime), for example:

public class OortConfigurationServlet extends GenericServlet
{
    public void init() throws ServletException
    {
        // Grab the Oort object
        Oort oort = (Oort)getServletContext().getAttribute(Oort.OORT_ATTRIBUTE);

        // Figure out the URLs to connect to, using other discovery means
        List<String> urls = ...;

        // Connect to the other Oort comets
        for (String url : urls)
        {
            OortComet oortComet = oort.observeComet(url);
            if (!oortComet.waitFor(1000, BayeuxClient.State.CONNECTED))
                throw new ServletException("Cannot connect to Oort comet " + url);
        }
    }

    public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException
    {
        throw new ServletException();
    }
}

The OortComet instance returned by Oort.observeComet(url) is a specialized version of BayeuxClient.

Membership Organization

When a Oort comet is instructed to connect to another Oort comet, a bidirectional communication is established.
If cometA is being connected to cometB (for example via oortA.observeComet(urlB)), then an OortComet instance will be created in cometA connected to cometB, and another OortComet instance will be created in cometB connected to cometA.

After this direct bidirectional communication has been established, a special message is broadcasted on the whole Oort cloud (on channel /oort/cloud) where the two comets broadcast their known siblings.
Every node receiving this message that does not know about those siblings will establish a bidirectional communication with them.

For example, imagine that there are two simple Oort clouds, one made of comets A and B and the other made of comets C and D.
When A and C gets connected, they broadcast their siblings (A broadcasts its siblings, now B and C, while C broadcasts its siblings, now A and D). All nodes connected, directly or indirectly, to the broadcaster will receive this message.
When C receives A's siblings it notices that one is itself (so it does nothing since it's already connected to A), but the other is the unknown sibling B, and establishes a bidirectional connection with B as well. Likewise, A receives the sibling broadcast message from C, and connects to D. Each new bidirectional connection that is established triggers a sibling broadcast message on the whole cloud, until all comets are connected to all comets.

If a comet crashes, for example D, then all other comets will detect that and disconnect from the faulty comet.

oort cloud membership organization

In this way, an Oort cloud is aware of its members, but it does not do anything useful for the application.
The next section will cover broadcast messages forwarding over the entire cloud.

Authentication

When a Oort comet connects to another Oort comet, it sends a handshake message containing an extension field that is peculiar to Oort, with the following format:

{
    "channel": "/meta/handshake",
    ... /* other usual handshake fields */
    "ext": {
        "org.cometd.oort": {
            "oortURL": "http://halley.cometd.org:8080/cometd",
            "cometURL": "http://halebopp.cometd.org:8080/cometd",
            "oortSecret": "cstw27r+l+XqE62IrNZdCDiUObA="
        }
    }
}

The oortURL field is the URL of the comet that initiates the handshakes; the cometURL field is the URL of the comet that receives the handshake; the oortSecret is the base64 encoding of the SHA-1 digested bytes of the pre-shared secret of the initiating Oort comet (see the section on common configuration above).

These extension fields provide a way for a Oort comet to distinguish a handshake of a remote client (which may be subject to authentication checks) from a handshake performed by remote comet.
For example, let's assume that remote clients always send an extension field containing an authentication token; then it is possible to write an implementation of SecurityPolicy as follows (see also the section about authentication):

public class OortSecurityPolicy extends DefaultSecurityPolicy
{
    private final Oort oort;

    private OortSecurityPolicy(Oort oort)
    {
        this.oort = oort;
    }

    @Override
    public boolean canHandshake(BayeuxServer server, ServerSession session, ServerMessage message)
    {
        // Local sessions can always handshake
        if (session.isLocalSession())
            return true;

        // Remote Oort comets are allowed to handshake
        if (oort.isOortHandshake(message))
            return true;

        // Remote clients must have a valid token
        Map ext = message.getExt();
        return ext != null && isValid(ext.get("token"));
    }
}

The Oort.isOortHandshake(Message) method validates the handshake message and returns true if it is a handshake from another Oort comet that has been configured with the same pre-shared secret. The pre-shared secret must be explicitly set because it defaults to a random string that is different for each Oort comet.

Broadcast Messages Forwarding

Broadcast messages (that is, messages sent to non-meta and non-service channels, see here for further details) are by definition messages that should be received by all clients that subscribed to the channel the message is being sent.

In an Oort cloud, you may have clients connected to different comets but subscribed to the same channel.
If we have clientA connected to cometA, clientB connected to cometB and clientC connected to cometC, then when clientA broadcasts a message we want clientB and clientC to receive that message, and therefore the Oort cloud must forward the message (sent by clientA and received by cometA) to cometB and cometC.

This is accomplished by configuring the Oort configuration servlets to set the oort.channels init parameter to a comma separated list of channels whose messages will be forwarded to the Oort cloud:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>cometd</servlet-name>
        <servlet-class>org.cometd.server.CometdServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>cometd</servlet-name>
        <url-pattern>/cometd/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>oort</servlet-name>
        <servlet-class>org.cometd.oort.OortMulticastConfigServlet</servlet-class>
        <load-on-startup>2</load-on-startup>
        <init-param>
            <param-name>oort.url</param-name>
            <param-value>http://host1:port/context/cometd</param-value>
        </init-param>
        <init-param>
            <param-name>oort.channels</param-name>
            <param-value>/stock/**,/forex/*,/alerts</param-value>
        </init-param>
    </servlet>
</web-app>

Alternatively, it is possible to use Oort.observeChannel(String channelName) to instruct a comet to listen for messages on that channel published to one of the known comets it is connected to.

When cometA observes a channel, it means that messages sent on that channel, but received by other comets, are automatically forwarded to cometA.

Note
Message forwarding is not bidirectional; if cometA forwards messages to cometB it is not automatic that cometB forwards messages to cometA.
However, in most cases the Oort comets are configured in the same way by the same initialization code, and therefore all comets will forward the same channels.

With the ability of observing messages published to broadcast channels, an Oort cloud can already implement a simple chat application among users connected to different nodes.
In the example below, when clientA publishes a message on channel /chat (green arrow), it arrives on cometA; since cometB and cometC have been configured to observe channel /chat, they will both receive the message from cometA (green arrows), and therefore they can deliver the chat message to clientB and clientC respectively (green arrows).

oort chat

If your application only needs to broadcast messages to clients connected to other comets, an Oort instance is all you need.

If you need to send messages directly to particular clients (for example, clientA wants to send a message to clientC but not to clientB, then you need to setup an additional component of the Oort clustering called Seti.

CometD 2 Oort Seti

CometD 2 Oort Seti

Seti is the Oort clustering component that tracks clients connected to any comet in the cloud, and allows an application to send messages to particular client(s) in the cloud transparently, as if they were in the local comet.

Configuration

An org.cometd.oort.Seti instance must be configured with an associated org.cometd.oort.Oort instance, either via code, or by configuring an org.cometd.oort.SetiServlet in web.xml, for example:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>cometd</servlet-name>
        <servlet-class>org.cometd.server.CometdServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>cometd</servlet-name>
        <url-pattern>/cometd/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>oort</servlet-name>
        <servlet-class>org.cometd.oort.OortServlet</servlet-class>
        <load-on-startup>2</load-on-startup>
        <init-param>
            <param-name>oort.url</param-name>
            <param-value>http://host:port/context/cometd</param-value>
        </init-param>
    </servlet>

    <servlet>
        <servlet-name>seti</servlet-name>
        <servlet-class>org.cometd.oort.SetiServlet</servlet-class>
        <load-on-startup>3</load-on-startup>
    </servlet>
</web-app>

Note how the load-on-startup parameter of the SetiServlet must be greater than that of the OortServlet.
SetiServlet does not have any configuration init parameter.

Associating Users

Seti allows to associate a unique string representation of a user with one or more ServerSession (see the CometD concepts for more details on ServerSession).
This is normally done when the user first logs in into the application, and the unique string representation of the user can be anything that the user provides to authenticate itself (a user name, a token, a database id, etc). For brevity, we will call this unique string representation of the user simply userId.
Note that the same userId may login multiple times (for example from a desktop computer and from a mobile device), so it will be associated to multiple ServerSessions.

In practice, the best way of associating a userId with a ServerSession is in a SecurityPolicy during authentication, for example:

public class MySecurityPolicy extends DefaultSecurityPolicy
{
    private final Seti seti;

    public MySecurityPolicy(Seti seti)
    {
        this.seti = seti;
    }

    @Override
    public boolean canHandshake(BayeuxServer server, ServerSession session, ServerMessage message)
    {
        if (session.isLocalSession())
            return true;

        // Authenticate
        String userId = performAuthentication(session, message);
        if (userId == null)
            return false;

        // Associate
        seti.associate(userId, session);

        return true;
    }   
}

Alternatively, you can perform the association in an BayeuxServer.Extension or in a CometD service, in response to a specific message that it is always sent by the client after a successful handshake.

When Seti associates a userId with a session, it broadcasts an internal message on the cloud (on channel /seti/all) that tells to all the other comets where this userId is.
In this way, all the comets in the cloud know where a particular userId resides.
The same userId may be associated in different comets (for example, the desktop computer logs in - and therefore is associated - in comet1, while the mobile device is associated in comet2).

Similarly, you can disassociate a userId at any time by calling Seti.disassociate(userId, session).
If the user disconnects or "disappears" (for example, it crashed or its network dropped), its session will be removed or expired by the server and Seti will automatically disassociate the userId.

Sending Messages

After users have been associated, sending a message to a particular user in the cloud can be done via Seti.sendMessage(String userId, String channel, Object data).

@Service("seti_forwarder");
public class SetiForwarder
{
    private final Seti seti;

    private SetiService(Seti seti)
    {
        this.seti = seti;
    }

    @Listener("/service/forward")
    public void forward(ServerSession session, ServerMessage message)
    {
        Map data = message.getDataAsMap();
        String targetUserId = (String)data.get("targetUserId");
        seti.sendMessage(targetUserId, message.getChannel(), data);
    }
}

In the example below, clientA wants to send a message to clientC but not to clientB.
Therefore clientA sends a message to the server it is connected to using a service channel (so that the message is not broadcasted), and then a specialized CometD service will route the message to the appropriate user using Seti (see above). The Seti on cometA knows that the target user is on cometC (thanks to the association) and forwards the message to cometC which will in turn deliver the message to clientC.

seti chat