Skip to the main content.

Did you know?

 

RTI is the world’s largest DDS supplier and Connext is the most trusted software framework for critical systems.

Success-Plan-Services-DSSuccess-Plan Services

Our Professional Services and Customer Success teams bring extensive experience to train, problem-solve, mentor, and accelerate customer success.

Learn more

Developers

From downloads to Hello World, we've got you covered. Find all of the tutorials, documentation, peer conversations and inspiration you need to get started using Connext today.

Try the Connectivity Selection Tool ⇢

Resources

RTI provides a broad range of technical and high-level resources designed to assist in understanding industry applications, the RTI Connext product line and its underlying data-centric technology.

Company

RTI is the infrastructure software company for smart-world systems. The company’s RTI Connext product is the world's leading software framework for intelligent distributed systems.

Contact Us

News & Events
Cooperation

6 min read

Data-Centric Stream Processing in the Fog

Data-Centric Stream Processing in the Fog

It has been almost an year and a half since my last post on reactive stream processing with RTI Connext DDS. A lot has happened since then and I've a lot of interesting updates to share with you.

For starters, here's a quick reason why reactive programming and specifically, stream processing makes a lot of sense in Industrial Internet Systems. As you probably know the Industrial Internet of Things (IoT) has really taken off, and billions and billions of new devices will be connected to the Internet in this decade. Many of them will belong to national critical infrastructure (e.g., smart power grids, smart roads, smart hospitals, smart cities). Most of the data generated by these devices cannot and will not be sent to the cloud due to bandwidth, latency, and exorbitant costs needed to do so. Therefore, much of the data must be correlated, merged, filtered, and analyzed at the edge so that only summary and exceptional data is sent to the cloud. This is Edge Computing (a.k.a. Fog).

Rx4DDS is an elegant solution to this problem that is productive, composable, concurrency-friendly, and scales well. Rx4DDS is a collection of Reactive Extensions (Rx) adapters for RTI Connext DDS. Rx is a collection of open-source libraries for functional-style composable asynchronous data processing. It is available in a number of flavors including RxJava, RxJS, RxCpp, Rx.Net, RxScala, RxPy and many more. The official site of all things Rx is ReactiveX.io.

The original Rx4DDS work started in C#. Since then, Rx4DDS has been implemented in C++ and JavaScript (Node.js). Recently, I presented Rx4DDS and 3 demos at CppCon'15. The session recording, all demo videos, and the slides are now available. And yeah, there's a research paper too.

Connext DDS Modern C++ API in Rx4DDS

The Rx4DDS C++ adapter uses the modern C++ API for DDS (DDS-C++PSM) at its core. DDS-C++PSM has it's own reactive flavor to it. For instance, DDS ReadCondition accepts a lambda to process data. User's thread of control automatically invokes the lambda provided earlier. As a result, the dispatch logic that programmers have to write is greatly simplified (often just one line). See an example here.

Rx4DDS takes DDS reactive programming to a whole new level. The best way to learn what it can do for you is watching a demonstration and understanding the corresponding code.

So, here is one of the demos that shows transformation of keyed data streams using GroupBy and Map.

The demo includes multiple parallel stateful data pipelines. The transformation pipeline is replicated for each key (i.e., shape color). The resulting topics ("Circle" and "Triangle") follow the lifecycle of the original instance in the "Square" topic. I.e., when a new square arrives, a new circle and triangle orbit around it. Similarly, when a square is disposed, circle and triangle are disposed with it.

Lets look at some code.

auto circle_writer = ...
auto triangle_writer = ...
int circle_degree = 0;
int tri_degree = 0;

using KeyedShapeObservable =
  rx::grouped_observable<string, LoanedSample>;

rx4dds::TopicSubscription
  topic_sub(participant, "Square", waitset, worker);

auto grouped_stream =
  topic_sub.create_observable()
           >> rx4dds::group_by_instance ([](ShapeType & shape) {
                return shape.color();
              });

auto subscription =
grouped_stream
  .flat_map([circle_writer, triangle_writer]
            (KeyedShapeObservable go)
  {
    rx::observable inner_transformed =
      go >> rx4dds::to_unkeyed()
         >> rx4dds::complete_on_dispose()
         >> rx4dds::error_on_no_alive_writers()
         >> filter([](LoanedSample s) {
                     return s.info().valid();
            })
         >> map([](LoanedSample valid) {
                  return valid.data();
            })
         >> map([circle_degree](ShapeType & square) mutable {
                  circle_degree = (circle_degree + 3) % 360;
                  return shape_location(square, circle_degree);
            })
         >> rx4dds::publish_over_dds(circle_writer, ShapeType(go.key())
         >> map([tri_degree](ShapeType & circle) mutable {
                  tri_degree = (tri_degree + 9) % 360;
                  return shape_location(circle, tri_degree);
            })
        >> rx4dds::publish_over_dds(triangle_writer, ShapeType(go.key());

    return inner_transformed;

  }).subscribe();

Diving Deep

There's a lot going on in here. But the first thing you might have noticed is that there's not a single loop. You don't need them here. Rx4DDS lets you write more declarative code. Think Linux command-line pipes and i/o redirection.

The TopicSubscription object is the entry point. As you might have suspected, it hides the underlying DDS DataReader. The create_observable function creates an observable from the TopicSubscription. This observable is a stream of ShapeType objects. Note that whether TopicSubscription uses a waitset or a listener to retrieve DDS data is up to the user. Just use the right constructor. In either case, create_observable returns an observable. The rest of the program structure is identical in either case. This is important.

"Rx4DDS decouples data retrieval from consumption."

The group_by_instance is an Rx4DDS "operator" that shards the incoming ShapeType objects across "inner" keyed observables (KeyedShapeObservable) which are bound to a specific key. It uses the the viewstate and instance handle information in SampleInfo for sharding. It also accepts a lambda to retrieve the key for each sample.

The grouped_stream object is a stream-of-streams--a natural mapping for keyed topics. After all, a keyed topic contains many instances and each instance has its own lifecycle as in Create-->Update-->Dispose. This lifecycle of instances maps naturally to observables as they have the same lifecycle expressed in terms of OnNext*[OnError|OnCompleted]. I.e., OnNext may be called zero or more times and one of OnError and OnCompleted if/when the observable ends.

We call flat_map on the grouped_stream. The flat_map operator has many uses. First, when there's a stream-of-streams, it flattens it out to just a stream. It does that by simply merging the individual streams together. Of course, sharding an incoming stream and merging each sharded streams back won't be useful at all. That's equivalent to a no-op. So flat_map lets you map each incoming stream to some other stream. Hence there's a lambda that returns an inner_transformed observable, which is a transformation of every instance-specific observable (go).

So what's the transformation? It's just concatenation of operators that simply describe what they do. First, we drop the "keyed" aspect and get a regular key-less ShapeType observable. We'll use the key shortly.

The complete_on_dispose and error_on_no_alive_writers operators constitute the data-centric magic sauce. When an instance is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event through the instance-specific observable. Likewise, when an instance runs into some sort of error (e.g., no alive writers), the error_on_no_alive_writers operator recognizes it and propagates an OnError event through the instance-specific observable.

"Data-centric interpretation of the data and SampleInfo is wired into the Rx4DDS programming model."

The best part is that it is quite extensible. If not-alive-writers does not mean an "error" in your application, you can either not use it or write your own "interpretations" of SampleInfo as you please. The error_on_no_alive_writers operator is very simple and gives you idea of what's possible.

The rest of the data pipeline is quite straight-forward. filter eliminates invalid samples and each valid sample is mapped to its data right after filter. At this point the source observable (go) is transformed to a pure observable<ShapeType>. What comes next is just business-logic.

Each ShapeType objects that is a location of the square is transformed to a circle position using basic geometry (not shown). Each circle position is also a ShapeType. It is then published over DDS via the circle_writer. Similar transformation occurs for triangles.

The publish_over_dds is an interesting operator. It's one more ingredient that goes in the making of data-centric magic sauce. Note that it accepts a ShapeType object created from the key. When OnComplete or OnError is propagated, publish_over_dds disposes the instance selected by the key. For instance, When a blue Square is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event. In response to OnCompleted the the publish_over_dds operator disposes both blue circle and blue triangle.

In summary,

"The finalization actions are baked into each data pipeline at the time of creation."

This is declarative programming as it completely hides explicit control flow necessary to do cleanup. This is one of the main strengths of Rx4DDS.

Finally, the flat_map operator simply merges the transformed (inner) streams together. The result of merge is ignored as there's no subsequent user-defined stage. The final subscribe function call sets the whole machinery going by creating the only user-visible subscription that the user-code has to manage.

The CppCon'15 presentation touches upon several concepts discussed here and more.

DDS and Rx Side-by-Side

I hope you are starting to see that Rx and DDS are a great match. More compatibilities reveal themselves as you start looking deeper. Here's how I think Rx and DDS are related architecturally.

  DDS Rx
Design Methodology Data-Centric Reactive, Compositional
Architecture Distributed Publish-subscribe In-memory Observable-Observer. Monadic
Anonymity/loose coupling Publisher does not know about subscribers Observable does not know about observers
Data Streaming A Topic<T> is a stream of data samples of type T An Observable<T> is a stream of object of type T
Stream lifecycle Instances (keys) have lifecycle

(New->Alive->Disposed)
Observables have lifecycle

OnNext*[OnCompleted|OnError]
Data Publication Publisher may publish without any subscriber "Hot" Observables
Data Reception Subscriber may read the same data over and over "Cold" Observables

Rx4DDS allows you to map many DDS concepts to declarative Rx programming model. Some of my favorite mappings are listed below. This table is based on the C# implementation of Rx4DDS.

DDS Concept Rx Concept/Type/Operator
Topic of type T An object that implements IObservable<T>, which internally creates a DataReader<T>
Communication status, Discovery event streams IObservable<SampleLostStatus> IObservable<SubscriptionBuiltinTopicData> IObservable<PublicationBuiltinTopicData>
Topic of type T with key type=Key IObservable<IGroupedObservable<Key, T>>
Detect a new instance Notify Observers about a new IGroupedObservable<Key, T> with key==instance. Notification using

IObserver<IGroupedObservable<Key, T>>.OnNext()
Dispose an instance Notify Observers through IObserver<IGroupedObservable<Key,T>>.OnCompleted
Take an instance update of type T Notify Observers about a new value of T using Iobserver<T>.OnNext()
Read with history=N IObservable<T>.Replay(N)
Query Conditions Iobservable<T>.Where(…) OR Iobservable<T>.GroupBy(…)
SELECT in CFT expression IObservable<T>.Select(...)
FROM in CFT expression DDSObservable.FromTopic("Topic1") DDSObservable.FromKeyedTopic("Topic2")
WHERE in CFT expression IObservable<T>.Where(...)
ORDER BY in CFT expression IObservable<T>.OrderBy(...)
MultiTopic (INNER JOIN) IObservable<T>.Join().Where().Select()
Joins between DDS and non-DDS data Join, CombineLatest, Zip

 Phew! This blogpost is already quite longer than expected. Yet, there's so much more to talk about. I would encourage you look at the other demonstration videos available here.

If JavaScript is your thing, you might also want to find out how we used DDS, RxJS, and Node.js Connector to implement Log Analytics and Anomaly Detection in Remote DataCenters. Check out these slides and a research paper.

Finally, we welcome your comments on this blog & we would be interested in discussing how this technology may fit your needs.