<img height="1" width="1" src="https://www.facebook.com/tr?id=1357777224302944&amp;ev=PageView &amp;noscript=1">

Reactive Programming using RTI Connext DDS and Microsoft Rx

April 9, 2014 by
|

Reactive Programming is perhaps among the top few technologies rocking the dev world recently. See ReactConf, InfoQ, and Gartner Hype Cycle. The titans of software technology are pushing reactive programming for mobile and cloud applications. I think this is simply an expansion--a major one--of proven techniques into new classes of applications, i.e., the applications joining the club are new, but the principles are really not.

What does it really mean to be reactive? (see video) Traditionally, reactive systems are long running systems that must respond to the external stimuli at speeds determined by the environment. The Reactive Manifesto describes four essential qualities of reactive systems: event-driven (i.e., push messages to consumers), scalable (i.e., accommodate dynamic load while maximizing resource usage), resilient (i.e., isolate faults), and responsive (i.e., high degree of predictability).

It may or may not surprise you, but RTI Connext DDS rocks in the reactive world. DDS is an excellent choice to build distributed reactive systems because it provides the necessary building blocks:

  1. Event-Driven: DDS is an event-based publish-subscribe middleware and it is asynchronous.
  2. Scalable: Peer-to-peer architecture of RTI Connext DDS scales
  3. Resilient: DDS allows very loosely coupled components which helps isolate faults when then occur making systems resilient.
  4. Responsive: DDS is designed for real-time systems that often have very stringent timeliness requirements. The utility of DDS quality-of-service configuration could not be overstated here.

So, what's the point? Let me ask you this …

What's common between spreadsheet cell formulas and a factory automation system?

Well, they are both reactive! Cells react to changes in the data and robots react to rolling stuff on an assembly line. The difference, however, is how they are built. Spreadsheet formulas are simply declarations of the intent: $(C3)=$(A1)+$(B2). Most DDS-based systems including factory automation systems are far from that because of the tyranny of imperative programming models and callback-hell—a (perhaps familiar) program structure where asynchronous callbacks, state-machines and synchronization conspire against you.

Wouldn't it be nice to be able to develop DDS systems that interact with moving, turning, rotating and flying things in a much clean, succinct, declarative way that does not degrade in to callback-hell? Is that even possible?

Indeed.

RTI Research is excited to announce the Rx4DDS.NET library that integrates Microsoft Reactive Extensions (Rx) with RTI Connext DDS on the .NET platform. Rx and DDS are quite complementary because Rx is based on the Subject-Observer pattern, which is analogous to the publish-subscribe pattern of DDS. Furthermore, the core tenet of Rx composition of operations over values that change over time complements DDS instances, which are data objects that change over time. DDS ensures propagation of changes to the interested remote participants. Consequently, combining Rx with DDS enables a coherent end-to-end dataflow architecture for both data distribution (which is performed by DDS) and processing (which is done by Rx). Rx and DDS together support location transparency of dataflow-style programs seamlessly. The resulting applications dramatically simplify concurrency to the extent that it can be simply configured.

So how does a reactive program using Rx4DDS.NET really looks like? Here is a swap program in C#.

IDisposable swap(DDS.DomainParticipant participant)
{
  return DDSObservable
        .FromTopic<ShapeTypeExtended>(participant, "Square")
        .Select(shape => new ShapeTypeExtended
        {
          x = shape.y,
          y = shape.x,
          color = shape.color,
          shapesize = shape.shapesize
        })
        .SubscribeAndDisposeOnCompleted(triangle_writer,
                new ShapeTypeExtended { color = "BLUE" });
}

The above program subscribes to the "Square" topic, swaps original x & y coordinates, and publishes triangles of the same size/color (but swapped x & y) on the "Triangle" topic. The little arrow (=>) that looks like a parameter to Select, defines a lambda (an anonymous function) with a single parameter names shape and the part after the arrow is the body of the lambda function. Btw, it also reacts to disposed "Square" and disposes the blue triangle in response.

Complex Event Processing

With Rx4DDS.NET, Complex Event Processing (CEP) is at your fingertips. Thanks to LINQ (Language INtegrated Query). CEP fans among you might be thrilled to see the following example.

IDisposable selectmany_correlator(DDS.DomainParticipant participant)
{
  var rx_circle_reader =
      DDSObservable.FromTopic<ShapeTypeExtended>(participant, "Circle");
  var rx_square_reader =
      DDSObservable.FromTopic<ShapeTypeExtended>(participant, "Square", Scheduler.Default);

  var correlator =
      from square in rx_square_reader
      from circle in rx_circle_reader.Take(1)
      where square.color == circle.color
      select new ShapeTypeExtended
      {
        x = shape.y,
        y = shape.x,
        color = shape.color,
        shapesize = shape.shapesize
      };

  return correlator.Subscribe(triangle_writer);

That's right. With Rx4DDS.NET, you can write SQL-like queries on streaming DDS data. You may think of it as a "continuous query" over streaming DDS data. The above query correlates squares and circles of the same color and produces triangles at the same location and color as that of the squares. The size of the triangles, however, changes with the x position of the circle. This example also uses a thread-pool (Scheduler.Default).

So, what's the magic sauce here? It is called Functional Reactive Programming (FRP). FRP is a declarative approach to system development wherein program specification amounts to "what" as opposed "how". The reason above programs are rather short is that FRP offers high-level abstractions that avoid the verbosity that is commonly observed in callback-based techniques. It allows chaining of "operations" on streaming data.

There is a lot more to Rx4DDS.NET than what you see here. In particular, it is designed to support keyed topics, discovery events, communication status messages, and you can correlate all of them reactively and concurrently. So download Rx4DDS.NET and Go Reactive today!

If you are interested in learning about how RTI and Vanderbilt University implemented real-time analytics for a soccer game using Rx4DDS.NET, see this paper and the rticonnextdds-reactive repository that includes all the sources.

Let Us Know What You Thought of the Article.

Leave a Comment Below.

Subscribe to Email Updates