Publish and Subscribe Tutorial

In this tutorial, we are going to walk through how to publish a command to the PolySync bus and subscribe to a particular message type.

1. Become a participant on the bus

In order to begin the publish/subscribe communication process, a PolySync node must first become a valid participant on the bus .

You can do this by calling:

 psync_init()

Once this has occurred, it becomes discoverable by all other nodes in the PolySync runtime, allowing it to publish and subscribe any number of message types.

Nearly all LiDAR sensors provide intensity and x/y/z positions for each point in the cloud. PolySync defined a high-level message ps_lidar_points_msg to hold those common fields, and provided other fields such as a start and end timestamp or the scanning LiDAR sensors that also provide that data to the Dynamic Driver interface.

The Dynamic Driver interface is written to parse the LiDAR sensor data from the native bus─the Ethernet socket, CAN channel, or serial connection─to process and abstract the data into the ps_lidar_points_msg and publish that message to the PolySync bus.

Any number of nodes may subscribe to the high-level message type to receive a copy of each instance that’s published to the bus.

When multiple Dynamic Driver nodes are publishing the same message type, subscribing nodes will filter for specific sources upon receiving the message.

2. Locate the C++ Code

You can find the C++ Publish Subscribe code as part of our public C++ examples repo here.

PolySync-Core-CPP-Examples/PublishSubscribe
├── Publish
│   ├── CMakeLists.txt
│   └── Publish.cpp
└── Subscribe
    ├── CMakeLists.txt
    └── Subscribe.cpp

2.1 Publish.cpp

#include <PolySyncNode.hpp>
#include <PolySyncDataModel.hpp>

using namespace std;

/**
 * @brief PublisherSubscriberNode class
 *
 *
 */
class PublisherSubscriberNode : public polysync::Node
{

private:
    const string node_name = "polysync-publish-cpp";
    const string platform_motion_msg_name = "ps_platform_motion_msg";

    ps_msg_type _messageType;

public:

    PublisherSubscriberNode()
    {
        setNodeType( PSYNC_NODE_TYPE_API_USER );
        setDomainID( PSYNC_DEFAULT_DOMAIN );
        setSDFID( PSYNC_SDF_ID_INVALID );
        setFlags( PSYNC_INIT_FLAG_STDOUT_LOGGING );
        setNodeName( node_name );
    }

    ~PublisherSubscriberNode()
    {

    }

    void initStateEvent() override
    {
        _messageType = getMessageTypeByName( platform_motion_msg_name );
    }

    void releaseStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void errorStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void fatalStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void warnStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void okStateEvent() override
    {
        // Create a message
        polysync::datamodel::PlatformMotionMessage message( *this );

        // Set the message timestamp (UTC), representing when the data was
        // received or when it originated
        message.setTimestamp( polysync::getTimestamp() );

        message.setLatitude( 45.515289 );

        message.setLongitude( -122.654355 );

        // Set the message header timestamp, representing when the message
        // was published to the PolySync bus
        message.setHeaderTimestamp( polysync::getTimestamp() );

        // Publish the message to the PolySync bus
        message.publish();

        // The ok state is called periodically by the system, sleep to reduce
        // the number of messages sent
        polysync::sleepMicro( 1000000 );
    }
};
int main()
{
    // Create an instance of the PublisherSubscriberNode and connect it to
    // PolySync
    PublisherSubscriberNode publisherNode;

    // When the node has been created, it will cause an initStateEvent to be
    // sent and then proceed into the okState.  connectToPolySync does not
    // return, use Ctrl-C to exit
    publisherNode.connectPolySync();

    return 0;
}

2.2 Publish.cpp explained

Please be aware that both of these headers must be included in C++ applications. The Node API PolySyncNode.hpp creates a node and defines the states within the node state machine. It also allows the node to publish and subscribe to the bus.

The PolySyncDataModel.hpp defines all messages, types, and constants in the PolySync C++ Data Model.

#include <PolySyncNode.hpp>
#include <PolySyncDataModel.hpp>

The following is an inclusive list of the node states that are defined as part of the PolySync node state machine:

Event function Execution criteria
setConfigurationEvent() Called once at the beginning of the program before any PolySync related tasks, and where command-line arguments should be handled
initStateEvent() Called once after the node transitions into the INIT state
okStateEvent() Called continuously while in the OK state
warnStateEvent() Called continuously while in the WARN state
errorStateEvent() Called continuously while in the ERROR state
fatalStateEvent() Called once after the node transitions into the FATAL state before the node terminates
releaseStateEvent() Called once on exit, triggered by CTRL+C
messageEvent() Fires once for each time that a subscribed message is received, access the message data payload

Unless we override the node states, each state does minimal to no operation.

You can then move on to subclass the C++ node object to create a PolySync node.

class PublisherSubscriberNode : public polysync::Node
{ ... }

It is important to remember to set the node name during object construction.

Since the node will be publishing a message to the PolySync bus, it needs to know the string representation for the message type. We can make this known to the node by calling the _message private member. This will hold the integer representation of the message type at runtime, which allows the node to identify and subscribe to the message type.

private:
    const string node_name = "polysync-publish-cpp";
    const string platform_motion_msg_name = "ps_platform_motion_msg";

    ps_msg_type _messageType;

The constructor is then called.

PublisherSubscriberNode()
    {
        setNodeType( PSYNC_NODE_TYPE_API_USER );
        setDomainID( PSYNC_DEFAULT_DOMAIN );
        setSDFID( PSYNC_SDF_ID_INVALID );
        setFlags( NODE_FLAGS_VALUE | PSYNC_INIT_FLAG_STDOUT_LOGGING );
        setNodeName( node_name );
    }

Now, we move on to call the OK state event. It is important to note that this is called once per execution cycle in the node state machine, while the node is in the OK state.

This OK state event will create an instance of the platform motion message, fill it with some sample data, and publish it to the global PolySync bus each time it is called.

You can control the node’s execution speed by allowing it to sleep more or less.

void okStateEvent() override
    {
        ...

        polysync::sleepMicro( 1000000 );
    }

Using the predefined messages in the datamodel, the node creates an instance of the ps_platform_motion_msg using the C++ representation for the message type.

Now, we will call the message timestamp. Each message has some form of a message timestamp, which is used to represent the UTC time signifying when the data was received from a hardware device, PolySync node, or software application.

        polysync::datamodel::PlatformMotionMessage message( *this );
        message.setTimestamp( polysync::getTimestamp() );

At this point, we can allow the node to update the latitude and longitude values for this instance of the platform motion message.

Check the C++ Data Model to see all data fields that exist and can be set in this message type.

        message.setLatitude( 45.515289 );

        message.setLongitude( -122.654355 );

We will now move on to setting the message header timestamp. This represents the UTC time when the message was published to the PolySync bus. The publishing node is always responsible for setting this directly before publishing the message to the PolySync bus.

        message.setHeaderTimestamp( polysync::getTimestamp() );

        message.publish();

Now we will connect the node to PolySync. The main entry point creates an instance of the custom publisher/subscriber node and immediately connects to PolySync, placing the node in the setConfigurationEvent and then the initStateEvent.

All non-PolySync related initialization should happen in the nodes setConfigurationEvent.

The node will exit the state machine execution loop once it receives the CTRL-C signal interrupt.

int main()
{
    PublisherSubscriberNode publisherNode;

    publisherNode.connectPolySync();

    return 0;
}

2.3 Subscribe.cpp

#include <iostream>
#include <PolySyncNode.hpp>
#include <PolySyncDataModel.hpp>

using namespace std;

/**
 * @brief Node flags to be OR'd with driver/interface flags.
 */
#ifndef NODE_FLAGS_VALUE
#define NODE_FLAGS_VALUE (0)
#endif
class PublisherSubscriberNode : public polysync::Node
{
private:
    const string node_name = "polysync-subscribe-cpp";
    const string platform_motion_msg_name = "ps_platform_motion_msg";

    ps_msg_type _messageType;

public:

    PublisherSubscriberNode()
    {
        setNodeType( PSYNC_NODE_TYPE_API_USER );
        setDomainID( PSYNC_DEFAULT_DOMAIN );
        setSDFID( PSYNC_SDF_ID_INVALID );
        setFlags( NODE_FLAGS_VALUE | PSYNC_INIT_FLAG_STDOUT_LOGGING );
        setNodeName( node_name );
    }

    ~PublisherSubscriberNode()
    {

    }

    void initStateEvent() override
    {
        _messageType = getMessageTypeByName( platform_motion_msg_name );

        // Register as a listener for the platform motion message
        registerListener( _messageType );
    }

    void releaseStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void errorStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void fatalStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void warnStateEvent() override
    {
        // do nothing, sleep for 10 milliseconds
        polysync::sleepMicro( 10000 );
    }

    void okStateEvent() override
    {
        // The ok state is called periodically by the system
        polysync::sleepMicro( 10000 );
    }

    void messageEvent( std::shared_ptr< polysync::Message > message ) override
    {
        using namespace polysync::datamodel;
        if( std::shared_ptr < polysync::datamodel::PlatformMotionMessage > platformMotionMsg =
                getSubclass< PlatformMotionMessage >( message ) )
        {  
            //platformMotionMsg->print();

            // or...

            std::cout << "Latitude: " << platformMotionMsg->getLatitude() << std::endl;

            std::cout << "Longitude: " << platformMotionMsg->getLongitude() << std::endl;

        }
    }

};

int main()
{
    // Create an instance of the PublisherSubscriberNode and connect it to
    // PolySync.
    PublisherSubscriberNode subscriberNode;
    // When the node has been created, it will cause an initStateEvent to be
    // sent and then proceed into the okState.  connectToPolySync does not
    // return, use Ctrl-C to exit.
    subscriberNode.connectPolySync();

    return 0;
}

2.4 Subscribe.cpp explained

The subscriber is largely similar to the publisher, and repeated variables and identical functions will be skipped here.

The message subscriber nodes should always query the runtime for the message type integer values, based on the string representation of the message type.

Once the message type is known, the node can register a listener. For every instance this message type is seen on the PolySync bus, this node’s message event will be executed once.

    void initStateEvent() override
    {
        _messageType = getMessageTypeByName( platform_motion_msg_name );

        registerListener( _messageType );
    }

The message event executes asynchronously in its own thread.

We will be using the defined C++ class template. This acts to promote the incoming base message to the appropriate type─the string representation of the message.

    void messageEvent( std::shared_ptr< polysync::Message > message ) override
    {
        using namespace polysync::datamodel;
        if( std::shared_ptr < polysync::datamodel::PlatformMotionMessage > platformMotionMsg =
                getSubclass< PlatformMotionMessage >( message ) )
        {  
            ...
        }
    }

Each message in the C++ API has a built-in print() method for quick and easy debugging. It will print each message field to std::out, or a provided ostream. Alternatively─and more commonly─you can access the data using the get*() functions.

It’s best to do as little processing in the nodes message event as possible. If the node needs to perform any intense processing it should be moved to another thread by queuing the incoming messages.

            //platformMotionMsg->print();

            // or...

            std::cout << "Latitude: " << platformMotionMsg->getLatitude() << std::endl;

            std::cout << "Longitude: " << platformMotionMsg->getLongitude() << std::endl;

3. Running the nodes

After building the two nodes using cmake and make, you can run them using two terminal windows.

3.1 Building the nodes

If you want to see the build dependencies of a PolySync node, you can check out the CMakeLists.txt file, and the included PSYNC_HOME/BuildResources.cmake file.

$ cd PolySync-Core-CPP-Examples/PublishSubscribe/Publish
$ mkdir build && cd build
$ cmake ..
$ make

Next, you’re going to build the subscriber node in a second terminal window.

$ cd PolySync-Core-CPP-Examples/PublishSubscribe/Subscribe
$ mkdir build && cd build
$ cmake ..
$ make

4. Running the publisher node

It is time to start the publisher node. This will begin publishing the platform motion message to the global PolySync bus, with latitude and longitude set to our static values.

$ ./polysync-publisher-cpp
2016-09-17 14:35:33.09s DEBUG  [polysync-publish-cpp-] - build version 2.0.6-1468018065
2016-09-17 14:35:34.09s DEBUG  [polysync-publish-cpp-281475340021795] - created participant - GUID: 0x0001000015A7B023 d(281475340021795)
2016-09-17 14:35:34.09s DEBUG  [polysync-publish-cpp-281475340021795] - message types visible to this node: 46
2016-09-17 14:35:34.09s DEBUG  [polysync-publish-cpp-281475340021795] - transition to state: INIT - timestamp: 1474148134509200
2016-09-17 14:35:34.09s DEBUG  [polysync-publish-cpp-281475340021795] - transition to state: OK - timestamp: 1474148134519985

5. Running the subscriber node

Now you will run the subscriber node. This connects to PolySync and will begin receiving data as soon as it enters the “OK” state.

$ ./polysync-subscriber-cpp
2016-09-17 14:35:36.09s DEBUG  [polysync-subscribe-cpp-] - build version 2.0.6-1468018065
2016-09-17 14:35:37.09s DEBUG  [polysync-subscribe-cpp-281476857950370] - created participant - GUID: 0x00010000702170A2 d(281476857950370)
2016-09-17 14:35:37.09s DEBUG  [polysync-subscribe-cpp-281476857950370] - message types visible to this node: 46
2016-09-17 14:35:37.09s DEBUG  [polysync-subscribe-cpp-281476857950370] - transition to state: INIT - timestamp: 1474148137993624
2016-09-17 14:35:37.09s DEBUG  [polysync-subscribe-cpp-281476857950370] - transition to state: OK - timestamp: 1474148137996558
Latitude: 45.5153
Longitude: -122.654
Latitude: 45.5153
Longitude: -122.654

6. Visualize the data

The latitude and longitude data can be visualized in Studio’s Trace plugin, by selecting the ps_platform_motion_msg.

To run Studio, enter the following:

$ polysync-core-studio

Conclusion

Congratulations! As you’ve worked through this tutorial, you’ve been introduced to the core concepts of PolySync communication through a message publisher/subscriber model, the basis for creating C++ nodes to access data from the global PolySync bus, and building the node against the PolySync APIs.