Data-Centric Stream Processing in the Fog Reply

It has been almost an year and a half since my last post on reactive stream processing with RTI Connext DDS. A lot has happened since then and I’ve a lot of interesting updates to share with you.

For starters, here’s a quick reason why reactive programming and specifically, stream processing makes a lot of sense in Industrial Internet Systems. As you probably know the Industrial Internet of Things (IoT) has really taken off, and billions and billions of new devices will be connected to the Internet in this decade. Many of them will belong to national critical infrastructure (e.g., smart power grids, smart roads, smart hospitals, smart cities). Most of the data generated by these devices cannot and will not be sent to the cloud due to bandwidth, latency, and exorbitant costs needed to do so. Therefore, much of the data must be correlated, merged, filtered, and analyzed at the edge so that only summary and exceptional data is sent to the cloud. This is Edge Computing (a.k.a. Fog).

Rx4DDS is an elegant solution to this problem that is productive, composable, concurrency-friendly, and scales well. Rx4DDS is a collection of Reactive Extensions (Rx) adapters for RTI Connext DDS. Rx is a collection of open-source libraries for functional-style composable asynchronous data processing. It is available in a number of flavors including RxJava, RxJS, RxCpp, Rx.Net, RxScala, RxPy and many more. The official site of all things Rx is ReactiveX.io.

The original Rx4DDS work started in C#. Since then, Rx4DDS has been implemented in C++ and JavaScript (Node.js). Recently, I presented Rx4DDS and 3 demos at CppCon’15. The session recording, all demo videos, and the slides are now available. And yeah, there’s a research paper too.

Connext DDS Modern C++ API in Rx4DDS

The Rx4DDS C++ adapter uses the modern C++ API for DDS (DDS-C++PSM) at its core. DDS-C++PSM has it’s own reactive flavor to it. For instance, DDS ReadCondition accepts a lambda to process data. User’s thread of control automatically invokes the lambda provided earlier. As a result, the dispatch logic that programmers have to write is greatly simplified (often just one line). See an example here.

Rx4DDS takes DDS reactive programming to a whole new level. The best way to learn what it can do for you is watching a demonstration and understanding the corresponding code. 

So, here is one of the demos that shows transformation of keyed data streams using GroupBy and Map.

The demo includes multiple parallel stateful data pipelines. The transformation pipeline is replicated for each key (i.e., shape color). The resulting topics (“Circle” and “Triangle”) follow the lifecycle of the original instance in the “Square” topic. I.e., when a new square arrives, a new circle and triangle orbit around it. Similarly, when a square is disposed, circle and triangle are disposed with it.

Lets look at some code.

auto circle_writer = ...
auto triangle_writer = ... 
int circle_degree = 0;
int tri_degree = 0;

using KeyedShapeObservable = 
  rx::grouped_observable<string, LoanedSample<ShapeType>>;

rx4dds::TopicSubscription<ShapeType> 
  topic_sub(participant, "Square", waitset, worker);

auto grouped_stream =
  topic_sub.create_observable() 
           >> rx4dds::group_by_instance ([](ShapeType & shape) { 
                return shape.color(); 
              });

auto subscription = 
grouped_stream
  .flat_map([circle_writer, triangle_writer]
            (KeyedShapeObservable go) 
  {
    rx::observable<ShapeType> inner_transformed =
      go >> rx4dds::to_unkeyed()
         >> rx4dds::complete_on_dispose()
         >> rx4dds::error_on_no_alive_writers()
         >> filter([](LoanedSample<ShapeType> s) {
                     return s.info().valid();
            }) 
         >> map([](LoanedSample<ShapeType> valid) {
                  return valid.data();
            }) 
         >> map([circle_degree](ShapeType & square) mutable {
                  circle_degree = (circle_degree + 3) % 360;
                  return shape_location(square, circle_degree);
            }) 
         >> rx4dds::publish_over_dds(circle_writer, ShapeType(go.key())
         >> map([tri_degree](ShapeType & circle) mutable {
                  tri_degree = (tri_degree + 9) % 360;
                  return shape_location(circle, tri_degree);
            }) 
        >> rx4dds::publish_over_dds(triangle_writer, ShapeType(go.key());
      
    return inner_transformed;

  }).subscribe();

Diving Deep

There’s a lot going on in here. But the first thing you might have noticed is that there’s not a single loop. You don’t need them here. Rx4DDS lets you write more declarative code. Think Linux command-line pipes and i/o redirection.

The TopicSubscription object is the entry point. As you might have suspected, it hides the underlying DDS DataReader. The create_observable function creates an observable from the TopicSubscription. This observable is a stream of ShapeType objects.  Note that whether TopicSubscription uses a waitset or a listener to retrieve DDS data is up to the user. Just use the right constructor. In either case, create_observable returns an observable. The rest of the program structure is identical in either case. This is important.

“Rx4DDS decouples data retrieval from consumption.”

The group_by_instance is an Rx4DDS “operator” that shards the incoming ShapeType objects across “inner” keyed observables (KeyedShapeObservable) which are bound to a specific key. It uses the the viewstate  and instance handle information in SampleInfo for sharding. It also accepts a lambda to retrieve the key for each sample.

The grouped_stream object is a stream-of-streams–a natural mapping for keyed topics. After all, a keyed topic contains many instances and each instance has its own lifecycle as in Create–>Update–>Dispose. This lifecycle of instances maps naturally to observables as they have the same lifecycle expressed in terms of OnNext*[OnError|OnCompleted]. I.e., OnNext may be called zero or more times and one of OnError and OnCompleted if/when the observable ends.

We call flat_map on the grouped_stream. The flat_map operator has many uses. First, when there’s a stream-of-streams, it flattens it out to just a stream. It does that by simply merging the individual streams together. Of course, sharding an incoming stream and merging each sharded streams back won’t be useful at all. That’s equivalent to a no-op. So flat_map lets you map each incoming stream to some other stream. Hence there’s a lambda that returns an inner_transformed observable, which is a transformation of every instance-specific observable (go).

So what’s the transformation? It’s just concatenation of operators that simply describe what they do. First, we drop the “keyed” aspect and get a regular key-less ShapeType observable. We’ll use the key shortly.

The complete_on_dispose and error_on_no_alive_writers operators constitute the data-centric magic sauce. When an instance is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event through the instance-specific observable. Likewise, when an instance runs into some sort of error (e.g., no alive writers), the error_on_no_alive_writers operator recognizes it and propagates an OnError event through the instance-specific observable.

Data-centric interpretation of the data and SampleInfo is wired into the Rx4DDS programming model.”

The best part is that it is quite extensible. If not-alive-writers does not mean an “error” in your application, you can either not use it or write your own “interpretations” of SampleInfo as you please. The  error_on_no_alive_writers  operator is very simple and gives you idea of what’s possible.

The rest of the data pipeline is quite straight-forward. filter eliminates invalid samples and each valid sample is mapped to its data right after filter. At this point the source observable (go) is transformed to a pure observable<ShapeType>. What comes next is just business-logic.

Each ShapeType objects that is a location of the square is transformed to a circle position using basic geometry (not shown). Each circle position is also a ShapeType. It is then published over DDS via the circle_writer. Similar transformation occurs for triangles.

The publish_over_dds is an interesting operator. It’s one more ingredient that goes in the making of data-centric magic sauce. Note that it accepts a ShapeType object created from the key. When OnComplete or OnError is propagated, publish_over_dds  disposes the instance selected by the key. For instance, When a blue Square is disposed, the complete_on_dispose operator recognizes it and propagates an OnCompleted event. In response to OnCompleted the the publish_over_dds operator disposes both blue circle and blue triangle.

In summary,

“The finalization actions are baked into each data pipeline at the time of creation.”

This is declarative programming as it completely hides explicit control flow necessary to do cleanup. This is one of the main strengths of Rx4DDS.

Finally, the flat_map operator simply merges the transformed (inner) streams together. The result of merge is ignored as there’s no subsequent user-defined stage. The final subscribe function call sets the whole machinery going by creating the only user-visible subscription that the user-code has to manage.

The CppCon’15 presentation touches upon several concepts discussed here and more.

DDS and Rx Side-by-Side

I hope you are starting to see that Rx and DDS are a great match. More compatibilities reveal themselves as you start looking deeper. Here’s how I think Rx and DDS are related architecturally.

DDS Rx
Design Methodology Data-Centric Reactive, Compositional
Architecture Distributed Publish-subscribe In-memory Observable-Observer. Monadic
Anonymity/loose coupling Publisher does not know about subscribers Observable does not know about observers
Data Streaming A Topic<T> is a stream of data samples of type T An Observable<T> is a stream of object of type T
Stream lifecycle Instances (keys) have lifecycle

(New->Alive->Disposed)

Observables have lifecycle

OnNext*[OnCompleted|OnError]

Data Publication Publisher may publish without any subscriber “Hot” Observables
Data Reception Subscriber may read the same data over and over “Cold” Observables

 

Rx4DDS allows you to map many DDS concepts to declarative Rx programming model. Some of my favorite mappings are listed below. This table is based on the C# implementation of Rx4DDS.

 

DDS Concept Rx Concept/Type/Operator
Topic of type T An object that implements IObservable<T>, which internally creates a DataReader<T>
Communication status, Discovery event streams IObservable<SampleLostStatus>

IObservable<SubscriptionBuiltinTopicData>

IObservable<PublicationBuiltinTopicData>

Topic of type T with key type=Key IObservable<IGroupedObservable<Key, T>>
Detect a new instance Notify Observers about a new IGroupedObservable<Key, T> with key==instance. Notification using

IObserver<IGroupedObservable<Key, T>>.OnNext()

Dispose an instance Notify Observers through IObserver<IGroupedObservable<Key,T>>.OnCompleted
Take an instance update of type T Notify Observers about a new value of T using Iobserver<T>.OnNext()
Read with history=N IObservable<T>.Replay(N)
Query Conditions Iobservable<T>.Where(…) OR

Iobservable<T>.GroupBy(…)

SELECT in CFT expression IObservable<T>.Select(…)
FROM  in CFT expression DDSObservable.FromTopic(“Topic1”)

DDSObservable.FromKeyedTopic(“Topic2”)

WHERE in CFT expression IObservable<T>.Where(…)
ORDER BY in CFT expression IObservable<T>.OrderBy(…)
MultiTopic (INNER JOIN) IObservable<T>.Join().Where().Select()
Joins between DDS and non-DDS data Join, CombineLatest, Zip

 

Phew! This blogpost is already quite longer than expected. Yet, there’s so much more to talk about. I would encourage you look at the other demonstration videos available here.

If JavaScript is your thing, you might also want to find out how we used DDS, RxJS, and Node.js Connector to implement Log Analytics and Anomaly Detection in Remote DataCenters. Check out these slides and a research paper.

Finally, we welcome your comments on this blog & we would be interested in discussing how this technology may fit your needs.

TCP scalability improvements Reply

I’m excited to talk about new stuff we have added to our latest release, RTI Connext 5.2.0. In particular, I’ll talk about scalability improvements we added to the RTI TCP Transport Plugin.

Full Windows IOCP Support

RTI Connext can be used for a wide range of scenarios: from relatively small deployments with dozens of DomainParticipants communicating over a LAN, to WAN scenarios involving hundreds or even thousands of DomainParticipants. Our TCP transport provides a way to communicate over the WAN that is NAT-friendly and easy to configure. In WAN scenarios, the transport can be used in asymmetric mode. In this case, there is one DomainParticipant acting as a TCP server that receives connections from several other DomainParticipants acting as TCP clients. The server DomainParticipant does not need to know the addresses of the client DomainParticipants.

Something to consider when building a TCP server that handles hundreds or thousands of clients is socket management strategy, or socket monitoring strategy. This refers to how the system reacts to socket-generated I/O events.

While the strategy used by RTI Connext 5.0.0 TCP Transport Plugin (select() monitoring) had good scalability for servers running on Linux systems (up to 1,500 clients), we observed during experiments that scalability on Windows systems dropped to approximately 600 clients.

In order to solve this issue, RTI Connext 5.1.0 added partial support of MS Windows IOCP (I/O Completion Ports) socket monitoring strategy to the TCP Transport Plugin.

MS Windows IOCP provides an efficient threading model for processing multiple asynchronous I/O requests on a multiprocessor system. During our test on Windows systems running RTI TCP Transport Plugin, we observed an increase of the scalability from roughly 600 clients to 1500. However, RTI Connext 5.1.0 did not include Windows IOCP support for TLS over TCP.

I’m glad to announce that the latest release RTI Connext 5.2.0 includes full support of Windows IOCP socket monitoring strategy.

You can start using this feature by adding the following XML snippet to the TCP Transport Plugin configuration:

<participant_qos>
  <property>
    <value>
      <element>
        <name>{transport_prefix}.socket_monitoring_kind</name>
        <value>WINDOWS_IOCP</value>
      </element>
    </value>
  </property>
</participant_qos>
  • *Where {transport_prefix} is the name you assigned to the RTI TCP Transport Plugin (for example, dds.transport.TCPv4.tcp1).

External Load Balancer Support

Windows IOCP support is not the only new scalability improvement included in RTI Connext 5.2.0. Now I’ll talk about another addition that provides support for even more DomainParticipants acting as TCP clients.

When requiring high scalability using TCP, the TCP server becomes the bottleneck, and we need to take advantage of external load balancers. A load balancer is a software or hardware device that increases the capacity and/or reliability of a system by transparently distributing the workload among multiple servers. In particular, a TCP load balancer distributes connections from TCP clients among multiple TCP servers – in our scenario, DomainParticipants.

RTI Connext TCP Transport Plugin creates multiple connections when communicating two DDS DomainParticipants. These connections create a common state – a session – between the two DomainParticipants. In previous releases, the RTI TCP Transport Plugin was not friendly with externals load balancers. This was because a load balancer may have distributed the connections among multiple DomainParticipants , thus preventing communication.

diagram00

In order to support external load balancers, RTI Connext 5.2.0 includes a new session-id negotiation feature. When session-id negotiation is enabled (by using the negotiate_session_id property) the TCP Transport Plugin will perform an additional message exchange that allows load balancers to assign to the same DomainParticipant all the connections belonging to the same session:

diagram01

In order to enable the session-id negotiation, you can use the following XML snippet:

<participant_qos>
  <property>
    <value>
      <element>
        <name>{transport_prefix}.negotiate_session_id</name>
        <value>1</value>
      </element>
    </value>
  </property>
</participant_qos>
  • *Where {transport_prefix} is the name you assigned to the RTI TCP Transport Plugin (for example, dds.transport.TCPv4.tcp1).

This feature has been tested on a system using F5 Big-IP load balancer. However, it will work with other load balancers as long as:

  • They are able to modify the TCP data stream to include a unique identification of the node serving the first connection on a session.
  • They are able to assign connections to a server depending on the content of the TCP data stream.

I hope you find these new features useful and helpful in deploying highly scalable RTI Connext–based systems. To try it for yourself, download our free 30 day trial, today!

We love our new Launcher Reply

For the new RTI Connext DDS 5.2 release, we have re-implemented the RTI Launcher application. We love it! We love the new native OS look and feel, we love the new functionality, and we’re confident you’ll love it, too.

LauncherWindowsLauncherLinuxLauncherMacOS

Of course, the basic idea is the same: providing you with an easy way to quickly access all the RTI products. The UI design hasn’t changed much, so the transition should easy for existing users. As usual, there are three main panels with our tools, infrastructure services and utilities. Every tool has a context menu, which shows related docs and information:
LauncherContextMenu

You can also open a window to show the command-line help for the service/utility – and what’s more, you can now access the history of commands that you launched in the current session for that specific product:

LauncherCommandHistory

This may turn out handy for you at some point, because it reminds you of the exact parameters you used for the call. You can also copy the contents for later reference.

Apart from the usual tabs for our tools, services and utilities, we’ve incorporated a new tab for third-party products, as part of our pilot project for an RTI marketplace:

LauncherThirdPartyTabLauncherThirdPartyDownload

This tab contains products and tools from our partners. These products are usually not installed by default, but worry not! You can easily download and install them by clicking on the green download arrow (or if you right-click to show the context menu, you’ll find a menu item to do that as well).

Another cool thing about our tabs is that there is an optional new tab (you just have to enable it) where you can put your stuff that you want to keep handy. I have to confess, I use (and love) Eclipse for development, so why not add it to my Launcher?

LauncherEclipse

I can even add tool-tips and context menu entries!

LauncherEclipseTooltipLauncherEclipseContextMenu

Good stuff. Now I can launch Eclipse from Launcher. Launcher has never been this “launchy”… And all configured via XML!

We’ve also added a Help tab. This tab contains links for information and documentation about RTI and Connext DDS. Some links are web links (you’ll need an Internet connection to access them) and some are links to local PDF documentation.

LauncherHelp

Lastly, I wanted to mention the Installation tab and the RTI Package Installer. There you can select a license file to use for your Connext installation. You can peer through the license contents (in a visual way). Launcher will notify you when your license is invalid or expired. You can also see which Connext products and components you have installed.
LauncherInstallation

And speaking of installing new (and awesome!) RTI stuff, Launcher has an interface for the new RTI Package Installer application. So if I want to install the new RTI Queuing Service I would just open the RTI Package Installer dialog and click the “+” button next to the package file for that product. (To open the RTI Package Installer, click the icon on the Installation tab or the Utilities tab.)
LauncherPackageInstaller

Then just click install and Launcher will call the Package Installer tool for you with all the files specified in the dialog. The new component will then be installed. Some components are shown by Launcher in panels and they will be ready to use after the installation.
LauncherQueuingInstalled

We think the new Launcher is a great addition to Connext 5.2! Go try it! We’re sure you’ll love it as much as we do!