Replacing ZeroMQ with RTI Connext DDS in an Actor Based System 8

Scalability is one of the key properties of modern software that allows for applications to adapt to changes in the context where they’re operating. Nowadays, distributed architectures, cloud computing, and multicore processors change the development of concurrent programming such that they help to fulfill scalability requirements. In these scenarios, the traditional shared state programming has shown its limit: threads must be able to synchronize to guarantee concurrency. This can be a complex, tricky, and expensive operation that if it is not done correctly, can easily lead to race conditions, deadlocks, or starvation; possibly compromising the overall performance of the system.

A different approach to concurrency can be used to address those problems – the Actor Model. The actor model provides a lightweight and isolated isolated way to create entities that don’t share state and that communicate only through asynchronous non-blocking messages, both locally and in distributed systems. Since they don’t share state, they can easily be used to achieve high concurrency and scalability in systems with distributed architectures and multicore processors.

As shown in Figure 1, in an actor based system, the actor can be seen as the fundamental unit of computation that embodies:

  1. Behavior: what does the actor do
  2. State: a set of variables that can be changed by the behavior
  3. Communication: how does the actor communicate with other actors
akka2

Figure 1: an actor is defined by a state, a behavior and a mailbox

Each actor in the system has a ‘mailbox’ and it becomes active only when another actor sends a message to their mailbox. Upon receipt of a message, the actor can:

  1. Create new actors
  2. Send messages
  3. Produce side-effects (e.g. write a file)

One of the most popular solutions based on the Actor Model is Akka: a toolkit and runtime framework for building highly concurrent, distributed, and fault tolerant applications on the JVM. Akka is written in Scala and has language bindings for Scala and Java. It has a modular structure that can easily be extended with the so-called ‘Extensions’.

In Akka, actors are purely reactive components: an actor is passive until a message is sent to it. When a message arrives to the actor’s mailbox one thread is allocated to the actor, the message is extracted from the mailbox and the actor applies the behavior. When the processing is done, the thread is returned to the pool. This way actors don’t occupy any CPU resources when they are inactive.

The actor’s behavior is the computation logic that is executed in response to a message. The internal state of the actor can change only as a response to a message, and the only way to observe the state is via message exchange. An actor can only process one message at a time—there is no need of synchronization nor coordination.

To define an actor in Akka, you have to extend the Actor trait, implementing the receive method, which is never called concurrently.

class MyActor extends Actor {
    def receive = {
        case “test” ⇒ println(“Receive test message”)
        case “send” ⇒ sender ! message
        case _      ⇒ log.info(“Unknown message”)
    }
}

The receive method returns a PartialFunction, e.g. a ‘match/case’ clause in which the message can be matched against the different case clauses, using Scala pattern matching.

To create an actor, first an ActorSystem has to be created. Calling the actorOf method will return a reference to the actor. To specify options, it is possible to pass a configuration class to the actorOf method.

val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor")
val child = context.actorOf(Props[MyActor], name = "myChild")

To send a message to an actor, you have to use the ! (bang) function, which is asynchronous and non blocking. The semantic of delivery is at-most-once both for local and remote actors. Thanks to the message passing communication of actor model, there is no tight-coupling between sender and receiver. Actors are location transparent and distributable by design; actors share no data and communicate using message passing, they can be mapped on a distributed architecture (scaling out).

Remote communication can be achieved using:

  1. Configuration files: the same system can be configured as distributed without code changes.
  2. Extensions

If the actor is a remote actor, it uses remote call under the hood. Akka can be configured to use various transports. The default transport mechanism is a Netty based TCP driver. Netty is an asynchronous event-driven framework for the development of maintainable and high performance applications. Asynchronous one-to-one channels are created between remote actors. So, sending a message to remote actors serializes the message transparently and sends it to the remote JVM. However, this solution is particularly inefficient for one-to-many communication patterns, as each message is replicated on the physical channel for each receiver.

Below, there is the snippet of a configuration file that uses the default Netty transport for remote communications. It specifies hostname and port parameters (passed to the remote system) to identify the system:

actor {
    provider = "akka.remote.RemoteActorRefProvider"
}
remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
        hostname = "192.168.42.31"
        port = 2552
    }
}

Akka has two ways of using remote communication: lookup a remote actor and create a remote actor.

To lookup an actor on a remote node, you can use the actorSelection(path) method:

val selection = 
    context.actorSelection(
        "akka.tcp://actorSystemName@10.0.0.1:2552 /user/actorName")
selection ! "Hello"

To create an actor on a remote node, you can use actorOf(Props[...], actorName) method instead, specifying in the configuration file of the application where the remote actor resides, using the address and the port of the remote system:

actor {
    deployment {
        /myActor {
            remote="akka.tcp://sampleActorSystem@127.0.0.1:2553"
        }
    }
}

The same semantics are used for sending messages to remote and local actors—there is no API dedicated for remote communication. If no deployment configuration exists, then actor is deployed locally.

Using Netty, a remote actor that wants to communicate to multiple actors has to know each receiver and replicate the message for each Netty TCP channel. This is clearly inefficient and difficult in scenarios where actors are mobile (i.e., they enter and exit the system frequently). For this reason a PUB/SUB communication pattern is needed.

Akka’s modular structure makes it easy to add external features and to manage them thought Akka itself.

Akka supports the PUB/SUB communication pattern through the ZeroMQ Extension. ZeroMQ is an asynchronous messaging library with a non-dedicated message broker. It supports different transports such as IPC, multicast (PGM), and TCP communication. The main disadvantage in using ZeroMQ as the base for PUB/SUB in Akka is its limited QoS capabilities. Also, the ZeroMQ PUB/SUB sockets are using TCP or PGM and there is no guarantee of reliability as slow subscribers drop messages. Other kinds of ZeroMQ sockets and other patterns can be used (e.g. ROUTER-DEALER), with the downside of increasing the complexity of the application.

To overcome these problems we decided to replace ZeroMQ with DDS. To do this, we developed an Akka Extension using RTI Connext DDS. Our Connext DDS extension follows Akka’s modularity principles, and thus can be easily integrated in an Actor System and used to integrate actors to a pre-existing systems based on DDS. The DDS Extension allows efficient one­-to-­many communication through the PUB/SUB Model offered by DDS.

The RTI Connext DDS extension adds a wide range of QoS policies to have the best delivery semantics for application-­level requirements, trade­-off between quality guarantees, and communication costs. For these reasons, we feel that the DDS extension represents an improvement in comparison with the ZeroMQ Extension.

Using DDS, actors can now exchange messages with one another, so long as they’re registered to the same DDS topic. This is accomplished using a proxy actor, called ActorDW, which encapsulates a DDS DataWriter. Other actors in the system send messages to ActorDW, using the usual actor syntax. A representation of the complete interaction is represented in Figure 2.

akka3

Figure 2: Interaction between ActorDW and DDS

The operation of loading the extension is similar to the ZeroMQ extension, as well as the rest of the extensions available for Akka:

val system = ActorSystem("DDSAkkaSystem")
val dds = DDSExtension(system)

The DDSExtension API is higher-level and more abstract than ZeroMQ extension’s. To publish a message the extension offers the newDataWriter method. The user has to specify only DDS’s DomainID, the topic name, and the name of the actor:

val actorDW = dds.newDataWriter(0, "My Topic", "myActorDW")

The method will return an actor that can be used with the usual bang syntax:

actorDW ! message

Using newDataWriter method in its simplest form, will use default values for DataWriterQoS,
DomainParticipantQos, and TopicQos. However a user can specify its own custom QoS parameters:

val qosDataWriter = DataWriterQos()
qosDataWriter.reliability(Reliability = Reliable)
val qosTopic = TopicQos()
qosTopic .reliability(Reliability = Reliable)
val actorDW = dds.newDataWriter(qosDataWriter , qosDomain, qosTopic, 0, "Test Topic", "datawriter")

In Figure 3 we see the actions hidden from the user; these actions are executed automatically by the DDSExtension  and include:

  1. Retrieve or create the DDS Domain
  2. Retrieve or create the Topic
  3. Create a DDS DataWriter
  4. Create and return the actor
akka4

Figure 3: actions performed under the hood for a DataWriter Actor

On the subscriber’s side, the extension provides the newDataReaderWS method. Firstly, the user has to create the actor that will receive the data, and then pass it to the method:

val actorDR = system.actorOf(Props[DRActor], "MyActorDR")
dds.newDataReaderWS(0, "Test Topic", actorDR, read)

Even in this case, the method has two forms: the simplest one gives default values for DataWriterQoS, DomainParticipantQos, and TopicQos, but users can specify their own custom QoS settings:

val qosDataReader = DataReaderQos()
qosDataReader.reliability(Reliability = Reliable)
val qosTopic = TopicQos()
qosTopic.reliability(Reliability = Reliable)

val actorDR = dds.newDataReader(qosDataReader , qosDomain, qosTopic, 0, "Test Topic", read)

The actions that the API performs are illustrated in Figure.

Figure

Figure 4: action performed automatically by the ActorDR

The system, creates a DDS DataReader and then registers the actor and the DataReader in a data structure, which depends on the type of the condition, contained in another component called DDSDemultiplexer.
The DDSDemultiplexer, illustrated in Figure 5, is a singleton that wraps a DDS WaitSet and offers methods to attach and detach conditions.

akka6

Figure 5: DDSDemultiplexer structure

At system startup, a thread is assigned to the DDSDemultiplexer, and it will block itself waiting for samples0. As shown in Figure 6, when a DataWriter writes some data into the Topic, the thread will be awake, and it will check if there are actors registered for that particular event. If so they will be notified.

akka7

Figure 6: DDSDemultiplexer loop waiting for data and then it notifies the DataReader when new Data is available

val readConditions = Map[ReadCondition, DDSNode]()
val statusConditions = Map[StatusCondition, DDSNode]()

while (running) {
    val retcode = ws.wait(activeConditions, timeout)
    val cond = activeConditions.get(i)
    val node = statusConditions.get(cond)

    getSample(cond, node)
}

As you can see, the Actor Model is an interesting approach you can use to solve issues related to concurrency, especially in distributed systems. We explored the challenges and the limitations when it comes to remote actors in regards to one-to-many communication patterns, showing how PUB/SUB is necessary to implement scalable solutions. We also illustrated the limitations of the ZeroMQ Extension for PUB/SUB, most notable concerning QoS configurations. For these reasons, we implemented an easy to use RTI Connext DDS extension that, with minimal effort from the user, can be easily tuned and configured. Using this approach, actors can now communicate, using RTI Connext DDS, where they’re able to take advantage of the PUB/SUB model and the great QoS support out of the box.

If you have any questions, feel free to comment and we will get back to you ASAP!

UPDATE:

The “DDSExtension for Akka” source code is now available on github: https://github.com/marcofranzoni/Akka-DDS-Extension. Feel free to clone the repository and follow the instruction available in the README.

 

Note: This Blog post was written by Marco Franzoni (University of Bologna) and Gianpiero Napoli (RTI) and is part of Marco’s recent work for his M.Sc. Degree in Computer Engineering from the University of Bologna: “Quality-of-Service Support in Distributed and Wide-Scale Actor Systems”

8 comments

  1. Interesting integration of DDS and Actors. I looked into Actors a few years ago but abandoned the research due to lack of funding. Towards the end of my exploration, I started having doubts regarding how well the DDS+Actors integration can really be. I’ve a several questions in that regard. I would love to hear your thoughts.

    1. As actors may come and go very quickly, how do you address the discovery issue of something that’s so meteoric? I suppose a 1:1 mapping of actor to dw/dr will be quite expensive discovery-wise if actors are short-lived.
    2. It looks like the ActorDW is a like a “broker” that multiplexes messages from other actors on the way out and an ActorDR demultiplexes the messages to recipient actors on the way in. If that’s the case, what’s the topic type between the DataReaders and the DataWriter? Perhaps some form of a generic sequence of octets?
    3. As ActorDW and ActorDR are themselves actors, they are single threaded. If they serve as some sort of brokers, is that a limiting factor in scaling concurrency?
    4. Lastly, I wonder how relaxed the communication model between actor can really be? I suppose with DDS Durable TopicQos, truly persistent message channels can be created. However, why would one revive an “old” actor instead of creating a new one? Towards the weaker side of the delivery-model spectrum, ZeroMQ offers at-most-once, which is already weaker than exactly-once (i.e., strict reliable QoS in DDS). If it gets weaker than that, such as reliable with fixed history or best-effort, what can we really say about the applications? Actors often interact with one-another in asynchronous request-response style. In weaker models, either request or response may be lost. As actors themselves are stateful, sending either the request or the response again will likely not have desirable effect.

    Like

    • Hi Sumant, thank you for your questions.
      I agree, a 1:1 mapping Actor-DataWriter will be expensive both in terms of discovery and resource usage. Because of that, ActorDWs’ lifespan is usually greater than actors’ lifespan. For example if a group of actors want to publish some data in the same topic with the same QoS characteristics, they can easily communicate to only one ActorDW and actors can get in and get out of the system
      freely. On the other side of communication, instead the ActorDR is the actor which receives the message; currently a DataReader is created per actor. The role of the dispatcher instead, is fulfilled by the DDSDemultiplexer component, which encapsulates a WaitSet.
      To solve scalability problems you can create actors on different nodes as Remote Actors.
      The topic type should be passed by the user to the methods of the DDS Extension, to instantiate the proper DataReader and DataWriter.
      Akka adopts an at-most-once delivery mainly for high performance and reduce the overhead, inspired by Erlang. The problem you raise about delivery guarantee is quite discussed, Akka “solves” it with business-level acknowledgement, because it’s the only way for the sender to know if an interaction is successfully completed. In the Akka documentation is mentioned this article, that I suggest you: http://www.infoq.com/articles/no-reliable-messaging
      I hope I made myself clear, for any other doubts don’t hesitate to contact me!

      Like

  2. hi,
    This is a very interesting implementation. We had similar experience in using ZeroMQ in one of the projects – limited QoS support, at-most once delivery semantics and no discovery of end-points. But we had to acknowledge the fact that ZMQ only provides a light-weight and fast sockets library, with some built-in messaging patterns as described in the post. It is really not a messaging middleware that can be used out-of-box; We will have to implement higher level abstractions – reliable delivery, discovery etc on top of it.
    But having a DDS extension will be great for people who use actors for building their applications.

    Like

  3. If QoS is the only problem (should be better explained) then one could use the NORM transport http://zeromq.org/topics:norm-protocol-transport. The NORM transport was around at the time of this post. Has this path been explored?

    DDS has better application QoS support but quoting one of the ZMQ white-papers: “Implementing QoS on middleware level (as done in most messaging systems) proves inadequate once physical network issues are taken into account.” means that QoS parameters in ZMQ are subject for the underlying networking protocol and configuration of the network hardware.

    Like

Submit a comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s