Modern Asynchronous Request/Reply with DDS-RPC 2

Complex distributed systems often use multiple styles of communication, interaction patterns, to implement functionality. One-to-many publish-subscribe, one-to-one request-reply, and one-to-one queuing (i.e., exactly-once delivery) are the most common. RTI Connext DDS supports all three. Two of them are available as standard specifications from the Object Management Group (OMG): the DDS specification (duh!) and newer Remote Procedure Call (RPC) over DDS (DDS-RPC) specification.

DDS-RPC specification is a significant generalization of RTI’s own Request-Reply API in C++ and Java. There are two major differences: (1) DDS-RPC includes code generation from IDL interface keyword and (2) asynchronous programming using futures.

Let’s look at an example straight from the DDS-RPC specification.

module robot 
{
  exception TooFast {};
  enum Command { START_COMMAND, STOP_COMMAND };
  struct Status {
    string msg;
  };

  @DDSService
  interface RobotControl 
  {
    void  command(Command com);
    float setSpeed(float speed) raises (TooFast);
    float getSpeed();
    void  getStatus(out Status status);
  };

}; //module robot

It’s pretty obvious what’s going on here. RobotControl is an IDL interface to send commands to a robot. You can start/stop it, get its status, and control its speed. The setSpeed operation returns the old speed when successful. The robot knows its limits and if you try to go too fast, it will throw a TooFast exception right in your face.

To bring this interface to life you need a code generator. As of this writing, however, rtiddsgen coughs at the interface keyword. It does not recognize it. But sometime in not-so-distant future it will.

For now, we’ve the next best thing. I’ve already created a working example of this interface using hand-written code.  The API of the hand-written code matches exactly with the standard bindings specified in DDS-RPC. As a matter of fact, the dds-rpc-cxx repository is an experimental implementation of the DDS-RPC API in C++. Look at the normative files to peak into all the gory API details.

Java folks… did I mention that this is a C++ post? Well, I just did… But do not despair. The dds-rpc-java repository has the equivalent Java API for the DDS-RPC specification. However, there’s no reference implementation. Sorry!

The Request/Reply Style Language Binding

DDS-RPC distinguishes between the Request/Reply and the Function-Call style language bindings. The Request-Reply language binding is nearly equivalent to RTI’s Request/Reply API with some additional asynchronous programming support. More on that later.

Here’s a client program that creates a requester to communicate with a RobotControl service. It asks for the current speed and increases it by 10 via getSpeed/setSpeed operations. The underlying request and reply topic names are “RobotControlRequest” and “RobotControlReply”. No surprise there.

RequesterParams requester_params = 
  dds::rpc::RequesterParams()
    .service_name("RobotControl")
    .domain_participant(...); 

Requester<RobotControl_Request, RobotControl_Reply> 
  requester(requester_params); 

// helper class for automatic memory management 
helper::unique_data<RobotControl_Request> request; 
dds::Sample<RobotControl_Reply> reply_sample; 
dds::Duration timeout = dds::Duration::from_seconds(1); 
float speed = 0; 

request->data._d = RobotControl_getSpeed_Hash; 
requester.send_request(*request); 

while (!requester.receive_reply( 
               reply_sample, 
               request->header.requestId, 
               timeout)); 

speed = reply_sample.data().data._u.getSpeed._u.result.return_; 
speed += 10; 
request->data._d = RobotControl_setSpeed_Hash; 
request->data._u.setSpeed.speed = speed; 

requester.send_request(*request); 

while (!requester.receive_reply( 
                reply_sample, 
                request->header.requestId, 
                timeout)); 

if(reply_sample.data().data._u.setSpeed._d == robot::TooFast_Ex_Hash) 
{
  printf("Going too fast.\n"); 
} 
else 
{ 
  speed = reply_sample.data().data._u.setSpeed._u.result.return_; 
  printf("New Speed = %f", speed); 
}

There’s quite a bit of ceremony to send just two requests to the robot. The request/reply style binding is lower-level than function-call style binding. The responsibility of packing and unpacking data from the request and reply samples falls on the programmer. An alternative, of course, is to have the boilerplate code auto-generated. That’s where the function-call style binding comes into picture.

The Function-Call Style Language Binding

The same client program can be written in much more pleasing way using the function-call style language binding.

robot::RobotControlSupport::Client 
  robot_client(rpc::ClientParams()
                 .domain_participant(...)
                 .service_name("RobotControl"));
float speed = 0;

try
{
  speed = robot_client.getSpeed();
  speed += 10;
  robot_client.setSpeed(speed);
}
catch (robot::TooFast &) {
  printf("Going too fast!\n");
}

Here, a code generator is expected to generate all the necessary boilerplate code to support natural RPC-style programming. The dds-rpc-cxx repository contains the code necessary for the RobotControl interface.

Pretty neat, isn’t it?

Not quite… (and imagine some ominous music and dark skies! Think Mordor to set the mood.)

An Abstraction that Isn’t

The premise of RPC, the concept, is that accessing a remote service can be and should be as easy as a synchronous local function call. The function-call style API tries to hide the complexity of network programming (serialization/deserialization) behind pretty looking interfaces. However, it works well only until it does not…

Synchronous RPC is a very poor abstraction of latency. Latency is a hard and insurmountable problem in network programming. For reference see the latency numbers every programmer should know.

If we slow down a computer to a time-scale that humans understand, it would take more than 10 years to complete the above synchronous program assuming we were controlling a robot placed in a different continent. Consider the following table taken from the Principles of Reactive Programming course on Coursera to get an idea of what human-friendly time-scale might look like.

latency

The problem with synchronous RPC is not only that it takes very long to execute but the calling thread will be blocked doing nothing for that long. What a waste!

Dealing with failures of the remote service is also a closely related problem. But for now let’s focus on the latency alone.

The problem discussed here isn’t new at all. The solution, however, is new and quite exciting, IMO.

Making Latecy Explicit … as an Effect

The DDS-RPC specification uses language-specific future<T> types to indicate that a particular operation is likely going to take very long time. In fact, every IDL interface gives rise to sync and async versions of the API and allows the programmers to choose.

The client-side generated code for the RobotControl interface includes the following asynchronous functions.

class RobotControlAsync
{
public:

  virtual dds::rpc::future<void> command_async(const robot::Command & command) = 0;
  virtual dds::rpc::future<float> setSpeed_async(float speed) = 0;
  virtual dds::rpc::future<float> getSpeed_async() = 0;
  virtual dds::rpc::future<robot::RobotControl_getStatus_Out> getStatus_async() = 0;

  virtual ~RobotControlAsync() { }
};

Note that every operation, including those that originally returned void return a future object, which is a surrogate for the value that might be available in future. If an operation reports an exception, the future object will contain the same exception and user will be able to access it. The dds::rpc::future maps to std::future in C++11 and C++14 environments.

As it turns out, C++11 futures allow us to separate invocation from execution of remote operations but retrieving the result requires waiting. Therefore, the resulting program is barely an improvement.

try
{
  dds::rpc::future<float> speed_fut = 
    robot_client.getSpeed_async(); 
  // Do some other stuff  
  while(speed_fut.wait_for(std::chrono::seconds(1)) == 
        std::future_status::timeout);
  
  speed = speed_fut.get();
  speed += 10;
  
  dds::rpc::future<float> set_speed_fut = 
    robot_client.setSpeed_async(speed);
  // Do even more stuff  
  while(set_speed_fut.wait_for(std::chrono::seconds(1)) == 
        std::future_status::timeout);
  
  set_speed_fut.get();
}
catch (robot::TooFast &) {
  printf("Going too fast!\n");
}

The client program is free to invoke multiple remote operations (say, to different services) back-to-back without blocking but there are at least three problems with it.

  1.  The program has to block in most cases to retrieve the results. In cases where result is available before blocking, there’s problem #2.
  2. It is also possible that while the main thread is busy doing something other than waiting, the result of the asynchronous operation is available. If no thread is waiting on retrieving the result, no progress with respect that chain of computation (i.e., adding 10 and calling setSpeed) won’t proceed. This is essentially what’s known as continuation blocking because the subsequent computation is blocked due to missing resources.
  3. Finally, the programmer must also do correlation of requests with responses because in general, the order in which the futures will be ready is not guaranteed to be the same as the requests were sent. Some remote operations may take longer than the others. To resolve this non-determinism, programmers may have to implement state-machines very carefully.

For the above reasons, this program is not a huge improvement over the request/reply style program at the beginning. In both cases, invocation is separate from retrieval of the result (i.e., both are asynchronous) but the program must wait to retrieve the results.

That reminds me of the saying:

“Blocking is the goto of Modern Concurrent Programming”

—someone who knows stuff

In the multicore era, where the programs must scale to utilize the underlying parallelism, any blocking function call robs the opportunity to scale. In most cases, blocking will consume more threads than strictly necessary to execute the program. In responsive UI applications, blocking is a big no-no. Who has tolerance for unresponsive apps these days?

Composable Futures to the Rescue

Thankfully people have thought of this problem already and proposed improved futures in the Concurrency TS for the ISO C++ standard. The improved futures support

  • Serial composition via .then()
  • Parallel composition via when_all() and when_any()

A lot has been written about improved futures. See Dr. Dobb’s, Facebook’s Folly, and monadic futures. I won’t repeat that here but an example of .then() is in order. This example is also available in the dds-rpc-cxx repository.

for(int i = 0;i < 5; i++)
{
  robot_client
    .getSpeed_async()
    .then([robot_client](future<float> && speed_fut) {
          float speed = speed_fut.get();
          printf("getSpeed = %f\n", speed);
          speed += 10;
          return remove_const(robot_client).setSpeed_async(speed);
      })
    .then([](future<float> && speed_fut) {
        try {
          float speed = speed_fut.get();
          printf("speed set successfully.\n");
        }
        catch (robot::TooFast &)
        {
          printf("Going too fast!\n");
        }
      });
}

printf("Press ENTER to continue\n");
getchar();

The program with .then() sets up a chain of continuations (i.e., closures passed in as callbacks) to be executed when the dependent futures are ready with their results. As soon as the future returned by getSpeed_async is ready, the first closure passed to the .then() is executed. It invokes setSpeed_async, which returns yet another future. When that future completes, the program continues with the second callback that just prints success/failure of the call.

Reactive Programming

This style of chaining dependent computations and constructing a dataflow graph of asynchronous operations is at the heart of reactive programming. It has a number of advantages.

  1. There’s no blocking per request. The program fires a series of requests, sets up a callback for each, and waits for everything to finish (at getchar()).
  2. There’s no continuation blocking because often the implementation of future is such that the thread that sets value of the promise object (the callee side of the future), continues with callback execution right-away.
  3. Correlation of requests with replies isn’t explicit because each async invocation produces a unique future and its result is available right in the callback. Any state needed to complete the execution of the callback can be captured in the closure object.
  4. Requires no incidental data structures, such as state machines and std::map for request/reply correlation. This benefit is a consequence of chained closures.

The advantages of this fluid style of asynchronous programming enabled by composable futures and lambdas is quite characteristic of modern asynchronous programming in languages such as JavaScript and C#. CompletableFuture in Java 8 is also provides the same pattern.

The Rabbit Hole Goes Deeper

While serial composition of futures (.then) looks cool, any non-trivial asynchronous program written with futures quickly get out of hands due to callbacks. The .then function restores some control over a series of asynchronous operations at the cost of the familiar control-flow and debugging capabilities.

Think about how you might write a program that speeds up the robot from its current speed to some MAX in increments of 10 by repeatedly calling getSpeed/setSpeed asynchronously and never blocking.

Here’s my attempt.

dds::rpc::future<float> speedup_until_maxspeed(
  robot::RobotControlSupport::Client & robot_client)
{
  static const int increment = 10;

  return
    robot_client
      .getSpeed_async()
      .then([robot_client](future<float> && speed_fut) 
      {
        float speed = speed_fut.get();
        speed += increment;
        if (speed <= MAX_SPEED)
        {
          printf("speedup_until_maxspeed: new speed = %f\n", speed);
          return remove_const(robot_client).setSpeed_async(speed);
        }
        else
          return dds::rpc::details::make_ready_future(speed);
      })
      .then([robot_client](future<float> && speed_fut) {
        float speed = speed_fut.get();
        if (speed + increment <= MAX_SPEED)
          return speedup_until_maxspeed(remove_const(robot_client));
        else
          return dds::rpc::details::make_ready_future(speed);
      });
}

// wait for the computation to finish asynchronously
speedup_until_maxspeed(robot_client).get();

This program is unusually complex for what little it achieves. The speedup_until_maxspeed function appears to be recursive as the second lambda calls the function again if the speed is not high enough. In reality the caller’s stack does not grow but only heap allocations for future’s shared state are done as successive calls to getSpeed/setSpeed are made.

The next animation might help understand what’s actually happening during execution. Click here to see a larger version.

dot-then-animation

The control-flow in a program with even heavy .then usage is going to be quite hard to understand, especially when there are nested callbacks, loops, and conditionals. We lose the familiar stack-trace because internally .then is stitching together many small program fragments (lambdas) that have only logical continuity but awkward physical and temporal continuity.

Debugging becomes harder too. To understand more what’s hard about it, I fired up Visual Studio debugger and stepped the program though several iterations. The call-stack appears to grow indefinitely while the program is “recursing”. But note there are many asynchronous calls in between. So the stack isn’t really growing. I tested with 100,000 iterations and the stack did not pop. Here’s a screenshot of the debugger.

stacktrace-wo-await

So, .then() seems like a mixed blessing.

Wouldn’t it be nice if we could dodge the awkwardness of continuations, write program like it’s synchronous but execute it fully asynchronously?

Welcome to C++ Coroutines

Microsoft has proposed a set of C++ language and library extensions called Resumable Functions that helps write asynchronous code that looks synchronous with familiar loops and branches. The latest proposal as of this writing (N4402) includes a new keyword await and its implementation relies on improved futures we discussed already.

Update: The latest C++ standard development suggests that the accepted keyword will be coawait (for coroutine await).

The speedup_until_maxspeed function can be written naturally as follows.

dds::rpc::future<void> test_iterative_await(
  robot::RobotControlSupport::Client & robot_client)
{
  static const int inc = 10;
  float speed = 0;

  while ((speed = await robot_client.getSpeed_async())+inc <= MAX_SPEED)
  {
    await robot_client.setSpeed_async(speed + inc);
    printf("current speed = %f\n", speed + inc);
  }
}

test_iterative_await(robot_client).get();

I’m sure C# programmers will immediately recognize that it looks quite similar to the async/await in .NET. C++ coroutines bring a popular feature in .NET to native programmers. Needless to say such a capability is highly desirable in C++ especially because it makes asynchronous programming with DDS-RPC effortless.

The best part is that compiler support for await is already available! Microsoft Visual Studio 2015 includes experimental implementation of resumable functions. I have created several working examples in the dds-rpc-cxx repository. The examples demonstrate await with both Request-Reply style language binding and Function-Call style language binding in the DDS-RPC specification.

Like the example before, I debugged this example too. It feels quite natural to debug because as one would expect, the stack does not appear to grow indefinitely. It’s like debugging a loop except that everything is running asynchronously. Things look pretty solid from what I could see! Here’s another screenshot.

stacktrace-await

Concurrency

The current experimental implementation of DDS-RPC uses thread-per-request model to implement concurrency. This is a terrible design choice but there’s a purpose and it’ very quick to implement. A much better implementation would use some sort of thread-pool and an internal queue (i.e., an executor). The concurrency TS is considering adding executors to C++.

Astute readers will probably realize that thread-per-request model implies that each request completes in its own thread and therefore a question arises regarding the thread that executes the remaining code. Is the code following await required to be thread-safe? How many threads might be executing speedup_until_maxspeed at a time?

Quick test (with rpc::future wrapping PPL tasks) of the above code revealed that the code following await is executed by two different threads. These two threads are never the ones created by the thread-per-request model. This implies that there’s a context switch from the thread that received the reply to the thread that resumed the test_iterative_await function. The same behavior is observed in the program with explicit .then calls. Perhaps the resulting behavior is dependent on the actual future/promise types in use. I also wonder if there is a way to execute code following await in parallel? Any comments, Microsoft?

A Sneak Peek into the Mechanics of await

A quick look into N4402 reveals that the await feature relies on composable futures, especially .then (serial composition) combinator. The compiler does all the magic of transforming the asynchronous code to a state machine that manages suspension and resumption automatically. It is a great example of how compiler and libraries can work together producing a beautiful symphony.

C++ coroutines work with any type that looks and behaves like composable futures. It also needs a corresponding promise type. The requirements on the library types are quite straight-forward, especially if you have a composable future type implemented somewhere else. Specifically, three free functions, await_ready, await_suspend, and await_resume must be available in the namespace of your future type.

In the DDS-RPC specification, dds::rpc::future<T> maps to std::future<T>. As std::future<T> does not have the necessary functionality for await to work correctly, I implemented dds::rpc::future<T> using both boost::future and concurrency::task<T> in Microsoft’s TPL.  Further, the dds::rpc::future<T> type was adapted with its own await_* functions and a promise type.

  template <typename T>
  bool await_ready(dds::rpc::future<T> const & t)
  {
    return t.is_ready();
  }

  template <typename T, typename Callback>
  void await_suspend(dds::rpc::future<T> & t, Callback resume)
  {
    t.then([resume](dds::rpc::future<T> const &)
    {
      resume();
    });
  }

  template <typename T>
  T await_resume(dds::rpc::future<T> & t)
  {
    return t.get();
  }

Adapting boost::future<T> was straight-forward as the N4402 proposal includes much of the necessary code but some tweaks were necessary because the draft and the implementation in Visual Studio 2015 appear slightly different. Implementing  dds::rpc::future<T> and dds::rpc::details::promise<T> using concurrency::task<T>, and concurrency::task_completion_event<T> needed little more work as both types had to be wrapped inside what would be standard types in near future (C++17). You can find all the details in future_adapter.hpp.

There are a number resources available on C++ Coroutines if you want to explore further. See the video recording of the CppCon’15 session on “C++ Coroutines: A Negative Overhead Abstraction”. Slides on this topic are available here and here. Resumable Functions proposal is not limited to just await. There are other closely related capabilities, such as the yield keyword for stackless coroutines, the await for keyword, generator, recursive_generator and more.

Indeed, this is truly an exciting time to be a C++ programmer.

Data-Centric Stream Processing in the Fog Reply

It has been almost an year and a half since my last post on reactive stream processing with RTI Connext DDS. A lot has happened since then and I’ve a lot of interesting updates to share with you.

For starters, here’s a quick reason why reactive programming and specifically, stream processing makes a lot of sense in Industrial Internet Systems. As you probably know the Industrial Internet of Things (IoT) has really taken off, and billions and billions of new devices will be connected to the Internet in this decade. Many of them will belong to national critical infrastructure (e.g., smart power grids, smart roads, smart hospitals, smart cities). Most of the data generated by these devices cannot and will not be sent to the cloud due to bandwidth, latency, and exorbitant costs needed to do so. Therefore, much of the data must be correlated, merged, filtered, and analyzed at the edge so that only summary and exceptional data is sent to the cloud. This is Edge Computing (a.k.a. Fog).

Rx4DDS is an elegant solution to this problem that is productive, composable, concurrency-friendly, and scales well. Rx4DDS is a collection of Reactive Extensions (Rx) adapters for RTI Connext DDS. Rx is a collection of open-source libraries for functional-style composable asynchronous data processing. It is available in a number of flavors including RxJava, RxJS, RxCpp, Rx.Net, RxScala, RxPy and many more. The official site of all things Rx is ReactiveX.io.

The original Rx4DDS work started in C#. Since then, Rx4DDS has been implemented in C++ and JavaScript (Node.js). Recently, I presented Rx4DDS and 3 demos at CppCon’15. The session recording, all demo videos, and the slides are now available. And yeah, there’s a research paper too.

Connext DDS Modern C++ API in Rx4DDS

The Rx4DDS C++ adapter uses the modern C++ API for DDS (DDS-C++PSM) at its core. DDS-C++PSM has it’s own reactive flavor to it. For instance, DDS ReadCondition accepts a lambda to process data. User’s thread of control automatically invokes the lambda provided earlier. As a result, the dispatch logic that programmers have to write is greatly simplified (often just one line). See an example here.

Rx4DDS takes DDS reactive programming to a whole new level. The best way to learn what it can do for you is watching a demonstration and understanding the corresponding code. 

So, here is one of the demos that shows transformation of keyed data streams using GroupBy and Map.

The demo includes multiple parallel stateful data pipelines. The transformation pipeline is replicated for each key (i.e., shape color). The resulting topics (“Circle” and “Triangle”) follow the lifecycle of the original instance in the “Square” topic. I.e., when a new square arrives, a new circle and triangle orbit around it. Similarly, when a square is disposed, circle and triangle are disposed with it.

Lets look at some code.

auto circle_writer = ...
auto triangle_writer = ... 
int circle_degree = 0;
int tri_degree = 0;

using KeyedShapeObservable = 
  rx::grouped_observable<string, LoanedSample<ShapeType>>;

rx4dds::TopicSubscription<ShapeType> 
  topic_sub(participant, "Square", waitset, worker);

auto grouped_stream =
  topic_sub.create_observable() 
           >> rx4dds::group_by_instance ([](ShapeType & shape) { 
                return shape.color(); 
              });

auto subscription = 
grouped_stream
  .flat_map([circle_writer, triangle_writer]
            (KeyedShapeObservable go) 
  {
    rx::observable<ShapeType> inner_transformed =
      go >> rx4dds::to_unkeyed()
         >> rx4dds::complete_on_dispose()
         >> rx4dds::error_on_no_alive_writers()
         >> filter([](LoanedSample<ShapeType> s) {
                     return s.info().valid();
            }) 
         >> map([](LoanedSample<ShapeType> valid) {
                  return valid.data();
            }) 
         >> map([circle_degree](ShapeType & square) mutable {
                  circle_degree = (circle_degree + 3) % 360;
                  return shape_location(square, circle_degree);
            }) 
         >> rx4dds::publish_over_dds(circle_writer, ShapeType(go.key())
         >> map([tri_degree](ShapeType & circle) mutable {
                  tri_degree = (tri_degree + 9) % 360;
                  return shape_location(circle, tri_degree);
            }) 
        >> rx4dds::publish_over_dds(triangle_writer, ShapeType(go.key());
      
    return inner_transformed;

  }).subscribe();

Diving Deep

There’s a lot going on in here. But the first thing you might have noticed is that there’s not a single loop. You don’t need them here. Rx4DDS lets you write more declarative code. Think Linux command-line pipes and i/o redirection.

The TopicSubscription object is the entry point. As you might have suspected, it hides the underlying DDS DataReader. The create_observable function creates an observable from the TopicSubscription. This observable is a stream of ShapeType objects.  Note that whether TopicSubscription uses a waitset or a listener to retrieve DDS data is up to the user. Just use the right constructor. In either case, create_observable returns an observable. The rest of the program structure is identical in either case. This is important.

“Rx4DDS decouples data retrieval from consumption.”

The group_by_instance is an Rx4DDS “operator” that shards the incoming ShapeType objects across “inner” keyed observables (KeyedShapeObservable) which are bound to a specific key. It uses the the viewstate  and instance handle information in SampleInfo for sharding. It also accepts a lambda to retrieve the key for each sample.

The grouped_stream object is a stream-of-streams–a natural mapping for keyed topics. After all, a keyed topic contains many instances and each instance has its own lifecycle as in Create–>Update–>Dispose. This lifecycle of instances maps naturally to observables as they have the same lifecycle expressed in terms of OnNext*[OnError|OnCompleted]. I.e., OnNext may be called zero or more times and one of OnError and OnCompleted if/when the observable ends.

We call flat_map on the grouped_stream. The flat_map operator has many uses. First, when there’s a stream-of-streams, it flattens it out to just a stream. It does that by simply merging the individual streams together. Of course, sharding an incoming stream and merging each sharded streams back won’t be useful at all. That’s equivalent to a no-op. So flat_map lets you map each incoming stream to some other stream. Hence there’s a lambda that returns an inner_transformed observable, which is a transformation of every instance-specific observable (go).

So what’s the transformation? It’s just concatenation of operators that simply describe what they do. First, we drop the “keyed” aspect and get a regular key-less ShapeType observable. We’ll use the key shortly.

The complete_on_dispose and error_on_no_alive_writers operators constitute the data-centric magic sauce. When an instance is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event through the instance-specific observable. Likewise, when an instance runs into some sort of error (e.g., no alive writers), the error_on_no_alive_writers operator recognizes it and propagates an OnError event through the instance-specific observable.

Data-centric interpretation of the data and SampleInfo is wired into the Rx4DDS programming model.”

The best part is that it is quite extensible. If not-alive-writers does not mean an “error” in your application, you can either not use it or write your own “interpretations” of SampleInfo as you please. The  error_on_no_alive_writers  operator is very simple and gives you idea of what’s possible.

The rest of the data pipeline is quite straight-forward. filter eliminates invalid samples and each valid sample is mapped to its data right after filter. At this point the source observable (go) is transformed to a pure observable<ShapeType>. What comes next is just business-logic.

Each ShapeType objects that is a location of the square is transformed to a circle position using basic geometry (not shown). Each circle position is also a ShapeType. It is then published over DDS via the circle_writer. Similar transformation occurs for triangles.

The publish_over_dds is an interesting operator. It’s one more ingredient that goes in the making of data-centric magic sauce. Note that it accepts a ShapeType object created from the key. When OnComplete or OnError is propagated, publish_over_dds  disposes the instance selected by the key. For instance, When a blue Square is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event. In response to OnCompleted the the publish_over_dds operator disposes both blue circle and blue triangle.

In summary,

“The finalization actions are baked into each data pipeline at the time of creation.”

This is declarative programming as it completely hides explicit control flow necessary to do cleanup. This is one of the main strengths of Rx4DDS.

Finally, the flat_map operator simply merges the transformed (inner) streams together. The result of merge is ignored as there’s no subsequent user-defined stage. The final subscribe function call sets the whole machinery going by creating the only user-visible subscription that the user-code has to manage.

The CppCon’15 presentation touches upon several concepts discussed here and more.

DDS and Rx Side-by-Side

I hope you are starting to see that Rx and DDS are a great match. More compatibilities reveal themselves as you start looking deeper. Here’s how I think Rx and DDS are related architecturally.

DDS Rx
Design Methodology Data-Centric Reactive, Compositional
Architecture Distributed Publish-subscribe In-memory Observable-Observer. Monadic
Anonymity/loose coupling Publisher does not know about subscribers Observable does not know about observers
Data Streaming A Topic<T> is a stream of data samples of type T An Observable<T> is a stream of object of type T
Stream lifecycle Instances (keys) have lifecycle

(New->Alive->Disposed)

Observables have lifecycle

OnNext*[OnCompleted|OnError]

Data Publication Publisher may publish without any subscriber “Hot” Observables
Data Reception Subscriber may read the same data over and over “Cold” Observables

 

Rx4DDS allows you to map many DDS concepts to declarative Rx programming model. Some of my favorite mappings are listed below. This table is based on the C# implementation of Rx4DDS.

 

DDS Concept Rx Concept/Type/Operator
Topic of type T An object that implements IObservable<T>, which internally creates a DataReader<T>
Communication status, Discovery event streams IObservable<SampleLostStatus>

IObservable<SubscriptionBuiltinTopicData>

IObservable<PublicationBuiltinTopicData>

Topic of type T with key type=Key IObservable<IGroupedObservable<Key, T>>
Detect a new instance Notify Observers about a new IGroupedObservable<Key, T> with key==instance. Notification using

IObserver<IGroupedObservable<Key, T>>.OnNext()

Dispose an instance Notify Observers through IObserver<IGroupedObservable<Key,T>>.OnCompleted
Take an instance update of type T Notify Observers about a new value of T using Iobserver<T>.OnNext()
Read with history=N IObservable<T>.Replay(N)
Query Conditions Iobservable<T>.Where(…) OR

Iobservable<T>.GroupBy(…)

SELECT in CFT expression IObservable<T>.Select(…)
FROM  in CFT expression DDSObservable.FromTopic(“Topic1”)

DDSObservable.FromKeyedTopic(“Topic2”)

WHERE in CFT expression IObservable<T>.Where(…)
ORDER BY in CFT expression IObservable<T>.OrderBy(…)
MultiTopic (INNER JOIN) IObservable<T>.Join().Where().Select()
Joins between DDS and non-DDS data Join, CombineLatest, Zip

 

Phew! This blogpost is already quite longer than expected. Yet, there’s so much more to talk about. I would encourage you look at the other demonstration videos available here.

If JavaScript is your thing, you might also want to find out how we used DDS, RxJS, and Node.js Connector to implement Log Analytics and Anomaly Detection in Remote DataCenters. Check out these slides and a research paper.

Finally, we welcome your comments on this blog & we would be interested in discussing how this technology may fit your needs.

Reactive Programming using RTI Connext DDS and Microsoft Rx 3

Reactive Programming is perhaps among the top few technologies rocking the dev world recently. See ReactConf, InfoQ, and Gartner Hype Cycle. The titans of software technology are pushing reactive programming for mobile and cloud applications. I think this is simply an expansion–a major one–of proven techniques into new classes of applications. I.e., the applications joining the club are new but the principles are really not.

What does it really mean to be reactive? (see video) Traditionally, reactive systems are long running systems that must respond to the external stimuli at speeds determined by its environment. The Reactive Manifesto describes four essential qualities of reactive systems: event-driven (i.e., push messages to consumers), scalable (i.e., accommodate dynamic load while maximizing resource usage), resilient (i.e., isolate faults), and responsive (i.e., high degree of predictability).

It may or may not surprise you but RTI Connext DDS rocks in the reactive world. DDS is an excellent choice to build distributed reactive systems because it provides the necessary building blocks:

  1. Event-Driven: DDS is an event-based publish-subscribe middleware and it is asynchronous.
  2. Scalable: Peer-to-peer architecture of RTI Connext DDS scales
  3. Resilient: DDS allows very loosely coupled components which helps isolate faults when then occur making systems resilient.
  4. Responsive: DDS is designed for real-time systems that often have very stringent timeliness requirements. The utility of DDS quality-of-service configuration could not be overstated here.

So, what’s the point? … Let me ask you this …

What’s common between spreadsheet cell formulas and a factory automation system?

Well, they are both reactive! Cells react to changes in the data and robots react to rolling stuff on an assembly line. The difference, however, is how they are built. Spreadsheet formulas are simply declarations of the intent: $(C3)=$(A1)+$(B2).  Most DDS-based systems including factory automation systems are far from that because of the tyranny of imperative programming models and callback-hell–a (perhaps familiar) program structure where asynchronous callbacks, state-machines, and synchronization conspire against you.

Wouldn’t it be nice to be able to develop DDS systems that interact with moving, turning, rotating, and flying things in a much clean, succinct, declarative way that does not degrade in to callback-hell? … Is that even possible?

Indeed.

Rx4DDSRTI Research is excited to announce the Rx4DDS.NET library that integrates Microsoft Reactive Extensions (Rx) with RTI Connext DDS on the .NET platform. Rx and DDS are quite complementary because Rx is based on the Subject-Observer pattern, which is analogous to the publish-subscribe pattern of DDS. Furthermore, the core tenet of Rx composition of operations over values that change over time complements DDS instances, which are data objects that change over time. DDS ensures propagation of changes to the interested remote participants. Consequently, combining Rx with DDS enables a coherent end-to-end dataflow architecture for both data distribution (which is performed by DDS) and processing (which is done by Rx). Rx and DDS together support location transparency of dataflow-style programs seamlessly. The resulting applications dramatically simplify concurrency to the extent that it can be simply configured.

So how does a reactive program using Rx4DDS.NET really looks like? Here is a swap program in C#.

swap

The above program subscribes to the “Square” topic, swaps original x & y coordinates, and publishes triangles of the same size/color (but swapped x & y) on the “Triangle” topic. The little arrow (=>) that looks like a parameter to Select, defines a lambda (an anonymous function) with a single parameter names shape and the part after the arrow is the body of the lambda function. Btw, it also reacts to disposed “Square” and disposes the blue triangle in response.

Complex Event Processing

With Rx4DDS.NET, Complex Event Processing (CEP) is at your fingertips. Thanks to LINQ (Language INtegrated Query). CEP fans among you might be thrilled to see the following example.

correlate

That’s right. With Rx4DDS.NET, you can write SQL-like queries on streaming DDS data. You may think of it as a “continuous query” over streaming DDS data. The above query correlates squares and circles of the same color and produces triangles at the same location and color as that of the squares. The size of the triangles, however, changes with the x position of the circle. This example also uses a thread-pool (Scheduler.Default).

So, what’s the magic sauce here? It is called Functional Reactive Programming (FRP). FRP is a declarative approach to system development wherein program specification amounts to “what” as opposed “how”. The reason above programs are rather short is that FRP offers high-level abstractions that avoid the verbosity that is commonly observed in callback-based techniques. It allows chaining of “operations” on streaming data.

There is a lot more to Rx4DDS.NET than what you see here. In particular, it is designed to support keyed topics, discovery events, communication status messages, and you can correlate all of them reactively and concurrently. So download Rx4DDS.NET and Go Reactive today!

If you are interested in learning about how RTI and Vanderbilt University implemented real-time analytics for a soccer game using Rx4DDS.NET, see this paper and the rticonnextdds-reactive repository that includes all the sources.