<img height="1" width="1" style="display:none" src="https://www.facebook.com/tr?id=135637837074656&amp;ev=PageView&amp;noscript=1">

Replacing ZeroMQ with RTI Connext DDS in an Actor Based System

August 14, 2014 by
|

Scalability is one of the key properties of modern software that allows for applications to adapt to changes in the context where they're operating. Nowadays, distributed architectures, cloud computing, and multicore processors change the development of concurrent programming such that they help to fulfill scalability requirements. In these scenarios, the traditional shared state programming has shown its limit: threads must be able to synchronize to guarantee concurrency. This can be a complex, tricky, and expensive operation that if it is not done correctly, can easily lead to race conditions, deadlocks, or starvation; possibly compromising the overall performance of the system.

A different approach to concurrency can be used to address those problems – the Actor Model. The actor model provides a lightweight and isolated isolated way to create entities that don't share state and that communicate only through asynchronous non-blocking messages, both locally and in distributed systems. Since they don't share state, they can easily be used to achieve high concurrency and scalability in systems with distributed architectures and multicore processors.

As shown in Figure 1, in an actor based system, the actor can be seen as the fundamental unit of computation that embodies:

  1. Behavior: what does the actor do
  2. State: a set of variables that can be changed by the behavior
  3. Communication: how does the actor communicate with other actors
akka2
Figure 1: an actor is defined by a state, a behavior and a mailbox

Each actor in the system has a 'mailbox' and it becomes active only when another actor sends a message to their mailbox. Upon receipt of a message, the actor can:

  1. Create new actors
  2. Send messages
  3. Produce side-effects (e.g. write a file)

One of the most popular solutions based on the Actor Model is Akka: a toolkit and runtime framework for building highly concurrent, distributed, and fault tolerant applications on the JVM. Akka is written in Scala and has language bindings for Scala and Java. It has a modular structure that can easily be extended with the so-called 'Extensions'.

In Akka, actors are purely reactive components: an actor is passive until a message is sent to it. When a message arrives to the actor's mailbox one thread is allocated to the actor, the message is extracted from the mailbox and the actor applies the behavior. When the processing is done, the thread is returned to the pool. This way actors don't occupy any CPU resources when they are inactive.

The actor's behavior is the computation logic that is executed in response to a message. The internal state of the actor can change only as a response to a message, and the only way to observe the state is via message exchange. An actor can only process one message at a time—there is no need of synchronization nor coordination.

To define an actor in Akka, you have to extend the Actor trait, implementing the receive method, which is never called concurrently.

class MyActor extends Actor {
    def receive = {
        case "test" ⇒ println("Receive test message")
        case "send" ⇒ sender ! message
        case _      ⇒ log.info("Unknown message")
    }
}

The receive method returns a PartialFunction, e.g. a 'match/case' clause in which the message can be matched against the different case clauses, using Scala pattern matching.

To create an actor, first an ActorSystem has to be created. Calling the actorOf method will return a reference to the actor. To specify options, it is possible to pass a configuration class to the actorOf method.

val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor")
val child = context.actorOf(Props[MyActor], name = "myChild")

To send a message to an actor, you have to use the ! (bang) function, which is asynchronous and non blocking. The semantic of delivery is at-most-once both for local and remote actors. Thanks to the message passing communication of actor model, there is no tight-coupling between sender and receiver. Actors are location transparent and distributable by design; actors share no data and communicate using message passing, they can be mapped on a distributed architecture (scaling out).

Remote communication can be achieved using:

  1. Configuration files: the same system can be configured as distributed without code changes.
  2. Extensions

If the actor is a remote actor, it uses remote call under the hood. Akka can be configured to use various transports. The default transport mechanism is a Netty based TCP driver. Netty is an asynchronous event-driven framework for the development of maintainable and high performance applications. Asynchronous one-to-one channels are created between remote actors. So, sending a message to remote actors serializes the message transparently and sends it to the remote JVM. However, this solution is particularly inefficient for one-to-many communication patterns, as each message is replicated on the physical channel for each receiver.

Below, there is the snippet of a configuration file that uses the default Netty transport for remote communications. It specifies hostname and port parameters (passed to the remote system) to identify the system:

actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "192.168.42.31"
port = 2552
}
}

Akka has two ways of using remote communication: lookup a remote actor and create a remote actor.

To lookup an actor on a remote node, you can use the actorSelection(path) method:

val selection = 
context.actorSelection(
"akka.tcp://actorSystemName@10.0.0.1:2552 /user/actorName")
selection ! "Hello"

To create an actor on a remote node, you can use actorOf(Props[...], actorName) method instead, specifying in the configuration file of the application where the remote actor resides, using the address and the port of the remote system:

actor {
deployment {
/myActor {
remote="akka.tcp://sampleActorSystem@127.0.0.1:2553"
}
}
}

The same semantics are used for sending messages to remote and local actors—there is no API dedicated for remote communication. If no deployment configuration exists, then actor is deployed locally.

Using Netty, a remote actor that wants to communicate to multiple actors has to know each receiver and replicate the message for each Netty TCP channel. This is clearly inefficient and difficult in scenarios where actors are mobile (i.e., they enter and exit the system frequently). For this reason a PUB/SUB communication pattern is needed.

Akka's modular structure makes it easy to add external features and to manage them thought Akka itself.

Akka supports the PUB/SUB communication pattern through the ZeroMQ Extension. ZeroMQ is an asynchronous messaging library with a non-dedicated message broker. It supports different transports such as IPC, multicast (PGM), and TCP communication. The main disadvantage in using ZeroMQ as the base for PUB/SUB in Akka is its limited QoS capabilities. Also, the ZeroMQ PUB/SUB sockets are using TCP or PGM and there is no guarantee of reliability as slow subscribers drop messages. Other kinds of ZeroMQ sockets and other patterns can be used (e.g. ROUTER-DEALER), with the downside of increasing the complexity of the application.

To overcome these problems we decided to replace ZeroMQ with DDS. To do this, we developed an Akka Extension using RTI Connext DDS. Our Connext DDS extension follows Akka's modularity principles, and thus can be easily integrated in an Actor System and used to integrate actors to a pre-existing systems based on DDS. The DDS Extension allows efficient one-to-many communication through the PUB/SUB Model offered by DDS.

The RTI Connext DDS extension adds a wide range of QoS policies to have the best delivery semantics for application-level requirements, trade-off between quality guarantees, and communication costs. For these reasons, we feel that the DDS extension represents an improvement in comparison with the ZeroMQ Extension.

Using DDS, actors can now exchange messages with one another, so long as they're registered to the same DDS topic. This is accomplished using a proxy actor, called ActorDW, which encapsulates a DDS DataWriter. Other actors in the system send messages to ActorDW, using the usual actor syntax. A representation of the complete interaction is represented in Figure 2.

akka3
Figure 2: Interaction between ActorDW and DDS

The operation of loading the extension is similar to the ZeroMQ extension, as well as the rest of the extensions available for Akka:

val system = ActorSystem("DDSAkkaSystem")
val dds = DDSExtension(system)

The DDSExtension API is higher-level and more abstract than ZeroMQ extension's. To publish a message the extension offers the newDataWriter method. The user has to specify only DDS's DomainID, the topic name, and the name of the actor:

val actorDW = dds.newDataWriter(0, "My Topic", "myActorDW")

The method will return an actor that can be used with the usual bang syntax:

actorDW ! message

Using newDataWriter method in its simplest form, will use default values for DataWriterQoS, DomainParticipantQos, and TopicQos. However a user can specify its own custom QoS parameters:

val qosDataWriter = DataWriterQos()
qosDataWriter.reliability(Reliability = Reliable)
val qosTopic = TopicQos()
qosTopic .reliability(Reliability = Reliable)
val actorDW = dds.newDataWriter(qosDataWriter , qosDomain, qosTopic, 0, "Test Topic", "datawriter")

In Figure 3 we see the actions hidden from the user; these actions are executed automatically by the DDSExtension and include:

  1. Retrieve or create the DDS Domain
  2. Retrieve or create the Topic
  3. Create a DDS DataWriter
  4. Create and return the actor
akka4
Figure 3: actions performed under the hood for a DataWriter Actor

On the subscriber's side, the extension provides the newDataReaderWS method. Firstly, the user has to create the actor that will receive the data, and then pass it to the method:

val actorDR = system.actorOf(Props[DRActor], "MyActorDR")
dds.newDataReaderWS(0, "Test Topic", actorDR, read)

Even in this case, the method has two forms: the simplest one gives default values for DataWriterQoS, DomainParticipantQos, and TopicQos, but users can specify their own custom QoS settings:

val qosDataReader = DataReaderQos()
qosDataReader.reliability(Reliability = Reliable)
val qosTopic = TopicQos()
qosTopic.reliability(Reliability = Reliable)

val actorDR = dds.newDataReader(qosDataReader , qosDomain, qosTopic, 0, "Test Topic", read)

The actions that the API performs are illustrated in Figure.

Figure
Figure 4: action performed automatically by the ActorDR

The system, creates a DDS DataReader and then registers the actor and the DataReader in a data structure, which depends on the type of the condition, contained in another component called DDSDemultiplexer. The DDSDemultiplexer, illustrated in Figure 5, is a singleton that wraps a DDS WaitSet and offers methods to attach and detach conditions.

 

akka6
Figure 5: DDSDemultiplexer structure

At system startup, a thread is assigned to the DDSDemultiplexer, and it will block itself waiting for samples0. As shown in Figure 6, when a DataWriter writes some data into the Topic, the thread will be awake, and it will check if there are actors registered for that particular event. If so they will be notified.

akka7
Figure 6: DDSDemultiplexer loop waiting for data and then it notifies the DataReader when new Data is available
val readConditions = Map[ReadCondition, DDSNode]()
val statusConditions = Map[StatusCondition, DDSNode]()

while (running) {
val retcode = ws.wait(activeConditions, timeout)
val cond = activeConditions.get(i)
val node = statusConditions.get(cond)

getSample(cond, node)
}

As you can see, the Actor Model is an interesting approach you can use to solve issues related to concurrency, especially in distributed systems. We explored the challenges and the limitations when it comes to remote actors in regards to one-to-many communication patterns, showing how PUB/SUB is necessary to implement scalable solutions. We also illustrated the limitations of the ZeroMQ Extension for PUB/SUB, most notable concerning QoS configurations. For these reasons, we implemented an easy to use RTI Connext DDS extension that, with minimal effort from the user, can be easily tuned and configured. Using this approach, actors can now communicate, using RTI Connext DDS, where they're able to take advantage of the PUB/SUB model and the great QoS support out of the box.

If you have any questions, feel free to comment and we will get back to you ASAP!

UPDATE:

The "DDSExtension for Akka" source code is now available on github: https://github.com/marcofranzoni/Akka-DDS-Extension. Feel free to clone the repository and follow the instruction available in the README.

 

Note: This Blog post was written by Marco Franzoni (University of Bologna) and Gianpiero Napoli (RTI) and is part of Marco's recent work for his M.Sc. Degree in Computer Engineering from the University of Bologna: "Quality-of-Service Support in Distributed and Wide-Scale Actor Systems"

Let Us Know What You Thought of the Article.

Leave a Comment Below.

30-DAY TRIAL

RTI_Blog_Latest-Resource-Module_30-Day-Eval_V0_500x500_0817.jpg
Get Started

Subscribe to Email Updates