Data Feeds

WAMP applications pass data between components via a publish/subscribe interface. A component can publish data to a unique address, and then any other component can register a callback to monitor said address. Any time data is published, all subscribed components will receive that data with the registered callback function.

The OCS Feed is a layer on top of this basic pub/sub functionality that agents can use to pass data to other OCS agents or clients. The OCS agent contains several methods which make it easy to register new feeds, publish data to registered feeds, and subscribing to other agent’s feeds. The feed system adds some structure to the base pub/sub layer by adding features such as data-caching, data and feed-name verification, and aggregation behavior customization.

Registering Feeds

A custom Agent class can register a feed using its OCSAgent instance. For example, if the OCSAgent instance is stored in the self.agent variable, a basic feed can be registered by calling:

self.agent.register_feed(feed_name)

The register_feed function takes a few other key word arguments to customize its behavior (See OCSAgent API for more details). buffer_time will set how long the feed should buffer messages before sending over crossbar, and max_messages will set how many messages are cached.

Feed Name Rules

Data on this feed will be published to the URI <agent_uri>.feeds.<feed_name> so feed names must follow standard URI formatting rules, meaning the feed can only use lowercase letters, numbers, and underscores. As the agent_uri will be unique for each agent, feed names are not required to be globally unique.

For instance, the Lakeshore372 agent may register a feed called temperatures. A Lakeshore372 instance with instance-id LSASIM would then publish data to the URI observatory.LSASIM.feeds.temperatures.

Aggregator Parameters

If you’d like your feed to be recorded by the hk aggregator, you must register with the keyword record=True, and you can customize the hk aggregator behavior with the agg_params option. These parameters will be passed to the Provider constructor, which will unpack them and set defaults. The following options can be specified here:

Aggregator params

frame_length (float)

Aggregation time (seconds) before frame is written to disk

fresh_time (float)

Time (seconds) before feed is considered “stale”, and is removed from the HK status frame

exclude_aggregator (bool)

If True, the HK Aggregator will not record this feed to G3.

exclude_influx (bool)

If True, the InfluxPublisher will not publish feed to the influx database.

Publishing to a Feed

You can publish data to a feed by calling:

agent.publish_to_feed(feed_name, message)

For standard feeds, message can be any json-ifyable object (i.e. strings, ints, floats, bools, or dicts containing these). Callbacks will receive the tuple (message, feed_data) where feed_data is a dict encoding most OCS.Feed attributes.

Recorded Feed Registration

To make sure that a feed is picked up by the aggregator, it must be registered with the option ‘record=True’. It also must be registered with the frame_length, which tells the aggregator how long each frame should be in seconds. An example can be seen in LS372_agent.py:

agg_params = {
    'frame_length': 10*60 #[sec]
}
self.agent.register_feed('temperatures',
                         record=True,
                         agg_params=agg_params,
                         buffer_time=1)

Recorded Feed Message Format

Recorded feeds require data to have a specific structure so that the aggregator can encode the data into G3 objects. Each message published should contain a block of data, or a group of data that is co-sampled. Messages should have this structure:

message = {
    'block_name': <Key to identify group of co-sampled data>
    'timestamp': <ctime of data>
    'data': {
        'field_name_1': <datapoint1>,
        'field_name_2': <datapoint2>
    }
}

or if data is buffered on the agent-side, multiple co-sampled data points can be passed at once as:

message = {
    'block_name': <Key to identify group of co-sampled data>
    'timestamps': [ctime1, ctime2, ... ]
    'data': {
        'field_name_1': [data1_1, data1_2, ...],
        'field_name_2': [data2_1, data2_2, ...]
    }
}

Note the pluralized timestamps key.

Data with consistent block_names will be written to disk as a single G3TimesampleMap object, which stores co-sampled data as a map containing multiple G3Vector objects along with a vector of timestamps. The field-names in the data block will be the keys in the G3TimesampleMap and will be the names that show up in Grafana, so it is important that these are descriptive and unique within each Feed. In the example above, the keys of the G3TimesampleMap will be field_name_1 and field_name_2. The block_name is only used internally and will not be written to disk, so it is only important that that the block_name is unique to this cosampled block.

Each set of data that a feed publishes that is non-cosampled should be published to a different block_name. For instance, for the L372 agent data coming from separate channels are not co-sampled. The LS372 temperatures should then look like:

message = {
    'block_name': 'channel_01',
    'timestamp': <ctime>,
    'data': {
        'channel_01_T': <channel 1 temperature reading>,
        'channel_01_R': <channel 1 resistance reading>
     }
}

The LS372 G3Frames will then contain a G3TimesampleMap for each channel, containing the temperature and voltage readings along with their timestamps.

Field Name Requirements

Field names must:

  • Contain only letters, numbers, and underscores.

  • Begin with a letter or any number of underscores followed by a letter.

  • Be no longer than 255 characters.

Attempting to publish an invalid field name should raise an error by the agent. However, if invalid field names somehow make it to the aggregator, the aggregator will attempt to correct them before writing to disk.

Subscribing to a Feed

Occasionally you might want your agent or client to receive data directly from another agent. For instance, the aggregator agent subscribes to all agent feeds to write their data to hk files, and the pysmurf-controller subscribes to the pysmurf-monitor feed so that it can put pysmurf data directly into the session-data object. There are a few different ways for your agent to subscribe to an OCS Feed. Once the twisted reactor has started, both the subscribe_to_feed and subscribe functions can be used. The subscribe_to_feed method takes the agent_address, feed_name, and the callback function. By default, this function protects an agent from subscribing to a topic multiple times. The subscribe function provides more direct access to the Crossbar subscription method. It takes in the full topic URI along with an optional dict options to specify more detailed subscription options such as pattern matching behavior. For instance, the following line will subscribe to all OCS feeds in the observatory namespace:

agent.subscribe(callback, 'observatory..feeds.', options={'match': 'wildcard'})

Before the reactor has started, the subscribe_on_start function can be used to queue up a subscribe call to run as soon as the reactor starts.

Subscribing with a Client

It is also possible for client objects to subscribe to feeds…

Examples

Here is an example showing how the registry agent subscribes its heartbeat registration callback:

class RegistryAgent:
    def __init__(self, agent):
        self.agent = agent
        self.agent.subscribe_on_start(
            self._register_heartbeat, 'observatory..feeds.heartbeat',
            options={'match': 'wildcard'}
        )

    def _register_heartbeat(self, _data):
        msg, feed = _data
        self.registered_agents[feed['agent_address']].refresh()