Aggregator Agent

The Aggregator Agent, also referred to as the Housekeeping Aggregator (or HKAggregator), is the OCS Agent responsible for recording all data published to the OCS Network’s Data Feeds. The Aggregator collects this data and writes it to disk in the .g3 file format.

Warning

Be sure to follow the instructions for Creating the OCS User during installation to ensure proper permissions for the Aggregator Agent to write data to disk.

usage: agent.py [-h] --data-dir DATA_DIR [--initial-state {idle,record}]
                [--time-per-file TIME_PER_FILE]

Agent Options

--data-dir

Base directory to store data. Subdirectories will be made here.

--initial-state

Possible choices: idle, record

Initial state of argument parser. Can be eitheridle or record

Default: “idle”

--time-per-file

Time per file in seconds. Defaults to 1 hr

Default: “3600”

Dependencies

The Aggregator Agent depends on both the spt3g_software and so3g packages.

Configuration File Examples

Below are configuration examples for the ocs config file and for running the Agent in a docker container.

OCS Site Config

The aggregator agent takes three site-config arguments. --initial-state can be either record or idle, and determines whether or not the aggregator starts recording as soon as it is initialized. --time-per-file specifies how long each file should be in seconds, and --data-dir specifies the default data directory. Both of these can also be manually specified in params when the record process is started. An example site-config entry is:

{'agent-class': 'AggregatorAgent',
   'instance-id': 'aggregator',
   'arguments': [['--initial-state', 'record'],
                 ['--time-per-file', '3600'],
                 ['--data-dir', '/data/hk']
   ]},

Note

/data/hk is used to avoid conflict with other collections of data. In general, it is recommended to use /data/timestreams to store detector timestreams, and /data/pysmurf to store archived pysmurf files.

Docker Compose

The docker image for the aggregator agent is simonsobs/ocs-aggregator-agent Here is an example configuration:

ocs-aggregator:
    image: simonsobs/ocs:latest
    container_name: ocs-aggregator
    hostname: ocs-docker
    user: "9000"
    environment:
      - LOGLEVEL=info
      - INSTANCE_ID=aggregator
    volumes:
      - ${OCS_CONFIG_DIR}:/config
      - /path/to/host/data:/data

Description

The job of the HK aggregator is to take data published by “Providers” and write it to disk. The aggregator considers each OCS Feed with record=True to be a separate provider, and so any data written by a single OCS Feed will be grouped together into G3Frames. See the OCS Feed page for info on how to register a feed so that it will be recorded by the aggregator.

Unregistered providers will automatically be added when they send data, and stale providers will be removed if no data is received in a specified time period.

To do this, the aggregator monitors all feeds in the namespace defined by the {address_root} prefix to find feeds that should be recorded. If the aggregator receives data from a feed registered with record=True, it will automatically add that feed as a Provider, and will start putting incoming data into frames every frame_length seconds, where frame_length is set by the Feed on registration. Providers will be automatically marked as stale and unregistered if it goes fresh_time seconds without receiving any data from the feed, where fresh_time is again set by the feed on registration.

The Aggregator Agent has a single main process record in which the aggregator will continuously loop and write any queued up data to a G3Frame and to disk. The record task’s session data object contains information such as the path of the current G3 file, and the status of active and stale providers.

File Format and Usage

Data is stored using the spt3g_software and so3g packages. so3g provides the schema that operates on standard G3 Frames. Each file consists of a sequence of Frame objects each containing its own data. Each frame is a free-form mapping from strings to data of a type derived from G3FrameObject, which behave similarly to a python dictionary. Notably, SPT3G files cannot directly store python lists, tuples, or numpy arrays, but must be wrapped in appropriate G3FrameObject container classes.

Examples of useful G3FrameObjects:

G3Objects

G3VectorDouble

A vector of doubles. It acts like a numpy array of doubles

G3Timestream

Acts like a Vector double with attached sample rate, start time, stop time, and units

G3TimestreamMap

A map of strings to G3Timestreams.

G3TimesampleMap

A map of vectors containing co-sampled data, packaged with a vector of timestamps.

The so3g package provides functions for loading data from disk. See the so3g Documentation for details. If you simply need a quick look at the contents of a file you can use the spt3g utility spt3g-dump. For instance:

$ spt3g-dump 1589310638.g3
Frame (Housekeeping) [
"description" (spt3g.core.G3String) => "HK data"
"hkagg_type" (spt3g.core.G3Int) => 0
"hkagg_version" (spt3g.core.G3Int) => 1
"session_id" (spt3g.core.G3Int) => 426626618778213812
"start_time" (spt3g.core.G3Double) => 1.58931e+09
]
Frame (Housekeeping) [
"hkagg_type" (spt3g.core.G3Int) => 1
"hkagg_version" (spt3g.core.G3Int) => 1
"providers" (spt3g.core.G3VectorFrameObject) => [0x7fdfe6cb4760]
"session_id" (spt3g.core.G3Int) => 426626618778213812
"timestamp" (spt3g.core.G3Double) => 1.58931e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.faker.feeds.false_temperatures"
"blocks" (spt3g.core.G3VectorFrameObject) => [0x7fdfe6d05760]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 1
"prov_id" (spt3g.core.G3Int) => 0
"provider_session_id" (spt3g.core.G3String) => "1589308315.450184"
"session_id" (spt3g.core.G3Int) => 426626618778213812
"timestamp" (spt3g.core.G3Double) => 1.58931e+09
]

HK File Structure

The HK file is made up of three frame types: Session, Status, and Data, labeled with an hkagg_type value of 0, 1, and 2 respectively. Session frames occur once at the start of every file and contain information about the current aggregation session, such as the session_id and start_time.

Status frames contains a list of all active providers. One will always follow the Session frame, and a new one will be written each time a provider is added or removed from the list of active providers.

Data frames contain all data published by a single provider. The data is stored under the key blocks as a list of G3TimesampleMaps, where each timesample map corresponds to a group of co-sampled data, grouped by their block name. Each G3TimesampleMap contains a G3Vector for each field_name specified in the data and a vector of timestamps.

Agent API

class ocs.agents.aggregator.agent.AggregatorAgent(agent, args)[source]

This class provide a WAMP wrapper for the data aggregator. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.

Parameters:
  • agent (OCSAgent) – OCS Agent object

  • args (namespace) – args from the function’s argparser.

time_per_file

Time (sec) before files should be rotated.

Type:

int

data_dir

Path to the base directory where data should be written.

Type:

path

aggregate

Specifies if the agent is currently aggregating data.

Type:

bool

incoming_data

Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Aggregator.

Type:

queue.Queue

loop_time

Time between iterations of the run loop.

Type:

float

record(test_mode=False)[source]

Process - This process will create an Aggregator instance, which will collect and write provider data to disk as long as this process is running.

Parameters:

test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.

Notes

The most recent file and active providers will be returned in the session data:

>>> response.session['data']
{"current_file": "/data/16020/1602089117.g3",
 "providers": {
    "observatory.fake-data1.feeds.false_temperatures": {
        "last_refresh": 1602089118.8225083,
        "sessid": "1602088928.8294137",
        "stale": false,
        "last_block_received": "temps"},
    "observatory.LSSIM.feeds.temperatures": {
         "last_refresh": 1602089118.8223345,
         "sessid": "1602088932.335811",
         "stale": false,
         "last_block_received": "temps"}}}

Supporting APIs

class ocs.agents.aggregator.drivers.Provider(address, sessid, prov_id, frame_length=300, fresh_time=180)[source]

Stores data for a single provider (OCS Feed). This class should only be accessed via a single thread.

Parameters:
  • addresss (string) – Full address of the provider

  • sessid (string) – session_id of the provider

  • prov_id (bool) – id assigned to the provider by the HKSessionHelper

  • frame_length (float, optional) – Time (in seconds) before data should be written into a frame. Defaults to 5 min.

  • fresh_time (float, optional) – Time (in seconds) before provider should be considered stale. Defaults to 3 min.

blocks

All blocks that are written by provider.

Type:

dict

frame_start_time

Start time of current frame

Type:

float

fresh_time

time (in seconds) that the provider can go without data before being labeled stale, and scheduled to be removed

Type:

float

last_refresh

Time when the provider was last refreshed (either through data or agent heartbeat).

Type:

time

last_block_received

String of the last block_name received.

Type:

str

log

txaio logger

Type:

txaio.Logger

refresh()[source]

Refresh provider

stale()[source]

Returns true if provider is stale and should be removed

new_frame_time()[source]

Returns true if its time for a new frame to be written

empty()[source]

Returns true if all blocks are empty

save_to_block(data)[source]

Saves a list of data points into blocks. A block will be created for any new block_name.

Examples

The format of data is shown in the following example:

>>> data = {'test': {'block_name': 'test',
                 'timestamps': [time.time()],
                 'data': {'key1': [1],
                          'key2': [2]},
                 }
           }
>>> prov.save_to_block(data)

Note the block name shows up twice, once as the dict key in the outer data dictionary, and again under the ‘block_name’ value. These must match – in this instance both the word ‘test’.

Parameters:

data (dict) – data dictionary from incoming data queue

clear()[source]

Clears all blocks and resets the frame_start_time

to_frame(hksess=None, clear=False)[source]

Returns a G3Frame based on the provider’s blocks.

Parameters:
  • hksess (optional) – If provided, the frame will be based off of hksession’s data frame. If the data will be put into a clean frame.

  • clear (bool) – Clears provider data if True.

class ocs.agents.aggregator.drivers.G3FileRotator((object)arg1)[source]

G3 module which handles file rotation. After time_per_file has elapsed, the rotator will end that file and create a new file with the filename function. It will write the last_session and last_status frame to any new file if they exist.

This class should only be accessed via a single thread.

Parameters:
  • time_per_file (float) – time (seconds) before a new file should be written

  • filename (callable) – function that generates new filenames.

filename

Function to call to create new filename on rotation

Type:

function

file_start_time

Start time for current file

Type:

int

writer

G3Writer object for current file. None if no file is open.

Type:

core.G3Writer

last_session

Last session frame written to disk. This is stored and written as the first frame on file rotation.

Type:

core.G3Frame

last_status

Last status frame written to disk. Stored and written as the second frame on file rotation.

Type:

core.G3Frame

current_file

Path to the current file being written.

Type:

str, optional

flush()[source]

Flushes current g3 file to disk

Process(frames)[source]

Writes frame to current file. If file has not been started or time_per_file has elapsed, file is closed and a new file is created by filename function passed to constructor

class ocs.agents.aggregator.drivers.Aggregator(incoming_data, time_per_file, data_dir, session=None)[source]

Data aggregator. This manages a collection of providers, and contains methods to write them to disk.

This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.

Parameters:
  • incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.

  • time_per_file (float) – Time (sec) before a new file should be written to disk.

  • data_dir (path) – Base directory for new files to be written.

  • session (OpSession, optional) – Session object of current agent process. If not specified, session data will not be written.

log

txaio logger

Type:

txaio.Logger

hksess

HKSession helper that assigns provider id’s to providers, and constructs so3g frames.

Type:

so3g.HKSessionHelper

writer

Module to use to write frames to disk.

Type:

G3Module

providers

dictionary of active providers, indexed by the hksess’s assigned provider id.

Type:

Dict[Provider]

pids

Dictionary of provider id’s assigned by the hksession. Indexed by (prov address, session_id).

Type:

Dict[Int]

write_status

If true, a status frame will be written next time providers are written to disk. This is set to True whenever a provider is added or removed.

Type:

bool

process_incoming_data()[source]

Takes all data from the incoming_data queue, and puts them into provider blocks.

add_provider(prov_address, prov_sessid, **prov_kwargs)[source]

Registers a new provider and writes a status frame.

Parameters:
  • prov_address (str) – full address of provider

  • prov_sessid (str) – session id of provider

Optional Arguments:

Additional kwargs are passed directly to the Provider constructor, so defaults are set there.

remove_provider(prov)[source]

Writes remaining provider data to frame and removes provider.

Parameters:

prov (Provider) – provider object that should be removed.

remove_stale_providers()[source]

Loops through all providers and check if they’ve gone stale. If they have, write their remaining data to disk (they shouldn’t have any) and delete them.

write_to_disk(clear=True, write_all=False)[source]

Loop through all providers, and write their data to the frame_queue if they have surpassed their frame_time, or if write_all is True.

Parameters:
  • clear (bool) – If True, provider data is cleared after write

  • write_all (bool) – If true all providers are written to disk regardless of whether frame_time has passed.

run()[source]

Main run iterator for the aggregator. This processes all incoming data, removes stale providers, and writes active providers to disk.

close()[source]

Flushes all remaining providers and closes file.