Data-Centric Stream Processing in the Fog Reply

It has been almost an year and a half since my last post on reactive stream processing with RTI Connext DDS. A lot has happened since then and I’ve a lot of interesting updates to share with you.

For starters, here’s a quick reason why reactive programming and specifically, stream processing makes a lot of sense in Industrial Internet Systems. As you probably know the Industrial Internet of Things (IoT) has really taken off, and billions and billions of new devices will be connected to the Internet in this decade. Many of them will belong to national critical infrastructure (e.g., smart power grids, smart roads, smart hospitals, smart cities). Most of the data generated by these devices cannot and will not be sent to the cloud due to bandwidth, latency, and exorbitant costs needed to do so. Therefore, much of the data must be correlated, merged, filtered, and analyzed at the edge so that only summary and exceptional data is sent to the cloud. This is Edge Computing (a.k.a. Fog).

Rx4DDS is an elegant solution to this problem that is productive, composable, concurrency-friendly, and scales well. Rx4DDS is a collection of Reactive Extensions (Rx) adapters for RTI Connext DDS. Rx is a collection of open-source libraries for functional-style composable asynchronous data processing. It is available in a number of flavors including RxJava, RxJS, RxCpp, Rx.Net, RxScala, RxPy and many more. The official site of all things Rx is ReactiveX.io.

The original Rx4DDS work started in C#. Since then, Rx4DDS has been implemented in C++ and JavaScript (Node.js). Recently, I presented Rx4DDS and 3 demos at CppCon’15. The session recording, all demo videos, and the slides are now available. And yeah, there’s a research paper too.

Connext DDS Modern C++ API in Rx4DDS

The Rx4DDS C++ adapter uses the modern C++ API for DDS (DDS-C++PSM) at its core. DDS-C++PSM has it’s own reactive flavor to it. For instance, DDS ReadCondition accepts a lambda to process data. User’s thread of control automatically invokes the lambda provided earlier. As a result, the dispatch logic that programmers have to write is greatly simplified (often just one line). See an example here.

Rx4DDS takes DDS reactive programming to a whole new level. The best way to learn what it can do for you is watching a demonstration and understanding the corresponding code. 

So, here is one of the demos that shows transformation of keyed data streams using GroupBy and Map.

The demo includes multiple parallel stateful data pipelines. The transformation pipeline is replicated for each key (i.e., shape color). The resulting topics (“Circle” and “Triangle”) follow the lifecycle of the original instance in the “Square” topic. I.e., when a new square arrives, a new circle and triangle orbit around it. Similarly, when a square is disposed, circle and triangle are disposed with it.

Lets look at some code.

auto circle_writer = ...
auto triangle_writer = ... 
int circle_degree = 0;
int tri_degree = 0;

using KeyedShapeObservable = 
  rx::grouped_observable<string, LoanedSample<ShapeType>>;

rx4dds::TopicSubscription<ShapeType> 
  topic_sub(participant, "Square", waitset, worker);

auto grouped_stream =
  topic_sub.create_observable() 
           >> rx4dds::group_by_instance ([](ShapeType & shape) { 
                return shape.color(); 
              });

auto subscription = 
grouped_stream
  .flat_map([circle_writer, triangle_writer]
            (KeyedShapeObservable go) 
  {
    rx::observable<ShapeType> inner_transformed =
      go >> rx4dds::to_unkeyed()
         >> rx4dds::complete_on_dispose()
         >> rx4dds::error_on_no_alive_writers()
         >> filter([](LoanedSample<ShapeType> s) {
                     return s.info().valid();
            }) 
         >> map([](LoanedSample<ShapeType> valid) {
                  return valid.data();
            }) 
         >> map([circle_degree](ShapeType & square) mutable {
                  circle_degree = (circle_degree + 3) % 360;
                  return shape_location(square, circle_degree);
            }) 
         >> rx4dds::publish_over_dds(circle_writer, ShapeType(go.key())
         >> map([tri_degree](ShapeType & circle) mutable {
                  tri_degree = (tri_degree + 9) % 360;
                  return shape_location(circle, tri_degree);
            }) 
        >> rx4dds::publish_over_dds(triangle_writer, ShapeType(go.key());
      
    return inner_transformed;

  }).subscribe();

Diving Deep

There’s a lot going on in here. But the first thing you might have noticed is that there’s not a single loop. You don’t need them here. Rx4DDS lets you write more declarative code. Think Linux command-line pipes and i/o redirection.

The TopicSubscription object is the entry point. As you might have suspected, it hides the underlying DDS DataReader. The create_observable function creates an observable from the TopicSubscription. This observable is a stream of ShapeType objects.  Note that whether TopicSubscription uses a waitset or a listener to retrieve DDS data is up to the user. Just use the right constructor. In either case, create_observable returns an observable. The rest of the program structure is identical in either case. This is important.

“Rx4DDS decouples data retrieval from consumption.”

The group_by_instance is an Rx4DDS “operator” that shards the incoming ShapeType objects across “inner” keyed observables (KeyedShapeObservable) which are bound to a specific key. It uses the the viewstate  and instance handle information in SampleInfo for sharding. It also accepts a lambda to retrieve the key for each sample.

The grouped_stream object is a stream-of-streams–a natural mapping for keyed topics. After all, a keyed topic contains many instances and each instance has its own lifecycle as in Create–>Update–>Dispose. This lifecycle of instances maps naturally to observables as they have the same lifecycle expressed in terms of OnNext*[OnError|OnCompleted]. I.e., OnNext may be called zero or more times and one of OnError and OnCompleted if/when the observable ends.

We call flat_map on the grouped_stream. The flat_map operator has many uses. First, when there’s a stream-of-streams, it flattens it out to just a stream. It does that by simply merging the individual streams together. Of course, sharding an incoming stream and merging each sharded streams back won’t be useful at all. That’s equivalent to a no-op. So flat_map lets you map each incoming stream to some other stream. Hence there’s a lambda that returns an inner_transformed observable, which is a transformation of every instance-specific observable (go).

So what’s the transformation? It’s just concatenation of operators that simply describe what they do. First, we drop the “keyed” aspect and get a regular key-less ShapeType observable. We’ll use the key shortly.

The complete_on_dispose and error_on_no_alive_writers operators constitute the data-centric magic sauce. When an instance is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event through the instance-specific observable. Likewise, when an instance runs into some sort of error (e.g., no alive writers), the error_on_no_alive_writers operator recognizes it and propagates an OnError event through the instance-specific observable.

Data-centric interpretation of the data and SampleInfo is wired into the Rx4DDS programming model.”

The best part is that it is quite extensible. If not-alive-writers does not mean an “error” in your application, you can either not use it or write your own “interpretations” of SampleInfo as you please. The  error_on_no_alive_writers  operator is very simple and gives you idea of what’s possible.

The rest of the data pipeline is quite straight-forward. filter eliminates invalid samples and each valid sample is mapped to its data right after filter. At this point the source observable (go) is transformed to a pure observable<ShapeType>. What comes next is just business-logic.

Each ShapeType objects that is a location of the square is transformed to a circle position using basic geometry (not shown). Each circle position is also a ShapeType. It is then published over DDS via the circle_writer. Similar transformation occurs for triangles.

The publish_over_dds is an interesting operator. It’s one more ingredient that goes in the making of data-centric magic sauce. Note that it accepts a ShapeType object created from the key. When OnComplete or OnError is propagated, publish_over_dds  disposes the instance selected by the key. For instance, When a blue Square is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event. In response to OnCompleted the the publish_over_dds operator disposes both blue circle and blue triangle.

In summary,

“The finalization actions are baked into each data pipeline at the time of creation.”

This is declarative programming as it completely hides explicit control flow necessary to do cleanup. This is one of the main strengths of Rx4DDS.

Finally, the flat_map operator simply merges the transformed (inner) streams together. The result of merge is ignored as there’s no subsequent user-defined stage. The final subscribe function call sets the whole machinery going by creating the only user-visible subscription that the user-code has to manage.

The CppCon’15 presentation touches upon several concepts discussed here and more.

DDS and Rx Side-by-Side

I hope you are starting to see that Rx and DDS are a great match. More compatibilities reveal themselves as you start looking deeper. Here’s how I think Rx and DDS are related architecturally.

DDS Rx
Design Methodology Data-Centric Reactive, Compositional
Architecture Distributed Publish-subscribe In-memory Observable-Observer. Monadic
Anonymity/loose coupling Publisher does not know about subscribers Observable does not know about observers
Data Streaming A Topic<T> is a stream of data samples of type T An Observable<T> is a stream of object of type T
Stream lifecycle Instances (keys) have lifecycle

(New->Alive->Disposed)

Observables have lifecycle

OnNext*[OnCompleted|OnError]

Data Publication Publisher may publish without any subscriber “Hot” Observables
Data Reception Subscriber may read the same data over and over “Cold” Observables

 

Rx4DDS allows you to map many DDS concepts to declarative Rx programming model. Some of my favorite mappings are listed below. This table is based on the C# implementation of Rx4DDS.

 

DDS Concept Rx Concept/Type/Operator
Topic of type T An object that implements IObservable<T>, which internally creates a DataReader<T>
Communication status, Discovery event streams IObservable<SampleLostStatus>

IObservable<SubscriptionBuiltinTopicData>

IObservable<PublicationBuiltinTopicData>

Topic of type T with key type=Key IObservable<IGroupedObservable<Key, T>>
Detect a new instance Notify Observers about a new IGroupedObservable<Key, T> with key==instance. Notification using

IObserver<IGroupedObservable<Key, T>>.OnNext()

Dispose an instance Notify Observers through IObserver<IGroupedObservable<Key,T>>.OnCompleted
Take an instance update of type T Notify Observers about a new value of T using Iobserver<T>.OnNext()
Read with history=N IObservable<T>.Replay(N)
Query Conditions Iobservable<T>.Where(…) OR

Iobservable<T>.GroupBy(…)

SELECT in CFT expression IObservable<T>.Select(…)
FROM  in CFT expression DDSObservable.FromTopic(“Topic1”)

DDSObservable.FromKeyedTopic(“Topic2”)

WHERE in CFT expression IObservable<T>.Where(…)
ORDER BY in CFT expression IObservable<T>.OrderBy(…)
MultiTopic (INNER JOIN) IObservable<T>.Join().Where().Select()
Joins between DDS and non-DDS data Join, CombineLatest, Zip

 

Phew! This blogpost is already quite longer than expected. Yet, there’s so much more to talk about. I would encourage you look at the other demonstration videos available here.

If JavaScript is your thing, you might also want to find out how we used DDS, RxJS, and Node.js Connector to implement Log Analytics and Anomaly Detection in Remote DataCenters. Check out these slides and a research paper.

Finally, we welcome your comments on this blog & we would be interested in discussing how this technology may fit your needs.

Modern C++ is here! 2

We are thrilled to announce that the Modern C++ API for RTI Connext DDS is complete and publicly available now with RTI Connext 5.2 (data sheet). A lot of our customers have already experienced a new way to write DDS code through our preview version—we hope you’ll enjoy it too!

This brand-new C++ programming API, based on the ISO/IEC C++ 2003 Language DDS PSM (DDS-PSM-Cxx) specification, brings modern C++ techniques and patterns to DDS, most notably:

  • Generic programming
  • Integration with the standard library
  • Automatic object lifecycle management, with value types and reference types
  • C++11 support: move constructors, initializer lists, lambdas, range for-loops, and more

We’ve also updated the code that rtiddsgen generates for your IDL types.

Where can you start?

Ah! If your system is using the previous C++ API and you still want to take advantage of all the other great features and bug fixes in 5.2, don’t worry. It’s still fully supported—now we call it the Traditional C++ API.

Stay tuned for more Modern C++ here at the RTI blogs, coming soon!

The Attack of the DDS C++ APIs 1

Connext DDS C++ API

If you are currently developing Connext DDS applications in C++, you have a single API option: Use a “traditional” DDS  API that is derived from the DDS interface definition defined in the OMG IDL language.

Thankfully, things are about to change and you have two more options to choose from:

  1. A new standard DDS C++ API which defined by the OMG DDS-C++ PSM standard (DDS-PSM-Cxx), and
  2. the C++ API obtained by applying a new OMG IDL to C++11 standard mapping to the IDL DDS interfaces, see Johnny’s post on the subject here.

Curious yet confused? This post will give you some historical context, explain why it makes sense to have both, and where you would use each one.

Context & Rationale.

First the basics: The OMG Interface Definition Language (IDL) provides a way to define interfaces and data-types independently of a specific programming language. This means that you can define an interface in IDL just once and then have it automatically mapped to C, C++, Java, C#/.NET, Ada, Python, Ruby, etc. These transformations are defined in the so-called IDL to Language mappings. See the OMG specifications page for a list of the current ones.

Define the interface once, and get it in all the popular languages automatically. It sounds like a great idea, doesn’t it? Yes, it is a very nice feature and the reason the OMG DDS Specification defined the DDS functionality at a higher level, using UML, and then the DDS APIs using OMG IDL instead of taking the time to define the API separately for each programming language.

However there is a price to be paid for the convenience of using IDL. Because IDL needs to be mappable to all the programming languages, it provides a “minimal common denominator” and lacks many features that are specific to the different programming languages. When you use a programming language you want to leverage its features and idioms to make the code as clean, readable, and robust as the language allows.  If the idioms are absent the code seems clunky.

For example, the IDL lacks generics, so the IDL to C++, Java, and C# does not use templates/generics even in the places that would make the most sense. Also IDL interfaces cannot have overloaded operations, or cannot define operators, the list goes on.

For this reason the DDS SIG decided that the best approach was to create new specifications that define the DDS APIs in the most popular languages, starting with C++ and Java. It is the same DDS specification (same  classes, operations, behaviors defined by the UML model in the DDS Specification)  but mapped directly each programming language leveraging the features and idioms natural in that language. Apply some elbow grease, meetings, reviews, and votes and you get the DDS C++ API and the DDS Java API specifications.

Choosing Your API & Having the Best Possible Experience.

Defining the DDS API directly on a programming language gives the best possible experience to the programmer. As Alex eloquently put it in his recent blog “Create a P2P Distributed Application In Under 35 Lines Of C++11 Code!”. So this is typically the best choice.

Why, you may ask, use the DDS API derived from the IDL to C++11 mapping?

It turns out that defining the APIs in IDL is very useful for automated tools and an important capability for component-based programming.

If an application developer uses a component-based programming framework or some other code-generation framework  it is isolated from the middleware APIs. The application user programs to the “framework APIs”  and the mapping to the “technical middleware layer” is handled by the code generation and the tooling. The IDL provides a nice intermediate representation for the framework which can then generate code that is not tied to a programming language, and the IDL to language mapping handles the rest. In this scenario the IDL to C++11 mapping may be the best approach. The tools can keep using IDL and yet the resulting code is cleaner, more efficient, and robust than the one that would be generated from the “classic” IDL to C++ mapping.

There are other situations where using IDL-derived APIs may be advantageous to the application, for example if they are integrating other technologies and frameworks that also use IDL. In this case the IDL to C++11 mapping may also be the best approach.

What about the old-tried (classic) IDL to C++ API? This also makes sense for people that do not particularly like or cannot use some of the “modern” C++ magic (auto pointers, STL, templated meta programming, etc.). For example some compilers that do not support these advanced features or it these extra libraries would make the code too complex or expensive to certify.

In the end, it is all about choice and ensuring that you have the best tool to do the job. One of the great things about DDS is that allows applications that are written in different programming languages to communicate and share information. Stated another way, DDS gives you a way to have these applications integrated and interoperate. The DDS concepts: DomainParticipant, Topic, DataWriter, DataReader, QoS, Content Filters, are the same across these options, while the specific language APIs can differ. Therefore, using a specific C++ language binding is a matter of choice and convenience, much like deciding to use Java, C, of C#.