Aggregator Agent
The Aggregator Agent, also referred to as the Housekeeping Aggregator (or HKAggregator), is the OCS Agent responsible for recording all data published to the OCS Network’s Data Feeds. The Aggregator collects this data and writes it to disk in the .g3 file format.
Warning
Be sure to follow the instructions for Creating the OCS User during installation to ensure proper permissions for the Aggregator Agent to write data to disk.
usage: agent.py [-h] --data-dir DATA_DIR [--initial-state {idle,record}]
[--time-per-file TIME_PER_FILE]
Agent Options
- --data-dir
Base directory to store data. Subdirectories will be made here.
- --initial-state
Possible choices: idle, record
Initial state of argument parser. Can be eitheridle or record
Default: “idle”
- --time-per-file
Time per file in seconds. Defaults to 1 hr
Default: “3600”
Dependencies
The Aggregator Agent depends on both the spt3g_software and so3g packages.
Configuration File Examples
Below are configuration examples for the ocs config file and for running the Agent in a docker container.
OCS Site Config
The aggregator agent takes three site-config arguments.
--initial-state
can be either record
or idle
,
and determines whether or not the aggregator starts recording
as soon as it is initialized.
--time-per-file
specifies how long each file should be in seconds,
and --data-dir
specifies the default data directory.
Both of these can also be manually specified in params
when
the record
process is started.
An example site-config entry is:
{'agent-class': 'AggregatorAgent',
'instance-id': 'aggregator',
'arguments': [['--initial-state', 'record'],
['--time-per-file', '3600'],
['--data-dir', '/data/hk']
]},
Note
/data/hk
is used to avoid conflict with other collections of data. In
general, it is recommended to use /data/timestreams
to store detector
timestreams, and /data/pysmurf
to store archived pysmurf files.
Docker Compose
The docker image for the aggregator agent is simonsobs/ocs-aggregator-agent Here is an example configuration:
ocs-aggregator:
image: simonsobs/ocs:latest
container_name: ocs-aggregator
hostname: ocs-docker
user: "9000"
environment:
- LOGLEVEL=info
- INSTANCE_ID=aggregator
volumes:
- ${OCS_CONFIG_DIR}:/config
- /path/to/host/data:/data
Description
The job of the HK aggregator is to take data published by “Providers” and write
it to disk.
The aggregator considers each OCS Feed with record=True
to be a separate
provider, and so any data written by a single OCS Feed will be grouped together
into G3Frames.
See the OCS Feed page for info on how
to register a feed so that it will be recorded by the aggregator.
Unregistered providers will automatically be added when they send data, and stale providers will be removed if no data is received in a specified time period.
To do this, the aggregator monitors all feeds in the namespace defined
by the {address_root} prefix to find
feeds that should be recorded. If the aggregator receives data from a feed
registered with record=True
, it will automatically add that feed as a
Provider, and will start putting incoming data into frames every frame_length
seconds, where frame_length
is set by the Feed on registration.
Providers will be automatically marked as stale and unregistered if it goes
fresh_time
seconds without receiving any data from the feed, where
fresh_time
is again set by the feed on registration.
The Aggregator Agent has a single main process record
in which the
aggregator will continuously loop and write any queued up data to a G3Frame and
to disk.
The record
task’s session data object contains information such as the
path of the current G3 file, and the status of active and stale providers.
File Format and Usage
Data is stored using the spt3g_software and so3g packages. so3g provides the schema that operates on standard G3 Frames. Each file consists of a sequence of Frame objects each containing its own data. Each frame is a free-form mapping from strings to data of a type derived from G3FrameObject, which behave similarly to a python dictionary. Notably, SPT3G files cannot directly store python lists, tuples, or numpy arrays, but must be wrapped in appropriate G3FrameObject container classes.
Examples of useful G3FrameObjects:
G3VectorDouble |
A vector of doubles. It acts like a numpy array of doubles |
G3Timestream |
Acts like a Vector double with attached sample rate, start time, stop time, and units |
G3TimestreamMap |
A map of strings to G3Timestreams. |
G3TimesampleMap |
A map of vectors containing co-sampled data, packaged with a vector of timestamps. |
The so3g package provides functions for loading data from disk. See
the so3g Documentation for
details. If you simply need a quick look at the contents of a file you can use
the spt3g utility spt3g-dump
. For instance:
$ spt3g-dump 1589310638.g3
Frame (Housekeeping) [
"description" (spt3g.core.G3String) => "HK data"
"hkagg_type" (spt3g.core.G3Int) => 0
"hkagg_version" (spt3g.core.G3Int) => 1
"session_id" (spt3g.core.G3Int) => 426626618778213812
"start_time" (spt3g.core.G3Double) => 1.58931e+09
]
Frame (Housekeeping) [
"hkagg_type" (spt3g.core.G3Int) => 1
"hkagg_version" (spt3g.core.G3Int) => 1
"providers" (spt3g.core.G3VectorFrameObject) => [0x7fdfe6cb4760]
"session_id" (spt3g.core.G3Int) => 426626618778213812
"timestamp" (spt3g.core.G3Double) => 1.58931e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.faker.feeds.false_temperatures"
"blocks" (spt3g.core.G3VectorFrameObject) => [0x7fdfe6d05760]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 1
"prov_id" (spt3g.core.G3Int) => 0
"provider_session_id" (spt3g.core.G3String) => "1589308315.450184"
"session_id" (spt3g.core.G3Int) => 426626618778213812
"timestamp" (spt3g.core.G3Double) => 1.58931e+09
]
HK File Structure
The HK file is made up of three frame types: Session, Status, and Data,
labeled with an hkagg_type
value of 0, 1, and 2 respectively.
Session frames occur once at the start of every file and contain information
about the current aggregation session, such as the session_id
and
start_time
.
Status frames contains a list of all active providers. One will always follow the Session frame, and a new one will be written each time a provider is added or removed from the list of active providers.
Data frames contain all data published by a single provider.
The data is stored under the key blocks
as a list of G3TimesampleMaps, where
each timesample map corresponds to a group of co-sampled data, grouped by their
block name.
Each G3TimesampleMap contains a G3Vector for each field_name
specified in the
data and a vector of timestamps.
Agent API
- class ocs.agents.aggregator.agent.AggregatorAgent(agent, args)[source]
This class provide a WAMP wrapper for the data aggregator. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.
- Parameters:
agent (OCSAgent) – OCS Agent object
args (namespace) – args from the function’s argparser.
- time_per_file
Time (sec) before files should be rotated.
- Type:
int
- data_dir
Path to the base directory where data should be written.
- Type:
path
- aggregate
Specifies if the agent is currently aggregating data.
- Type:
bool
- incoming_data
Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Aggregator.
- Type:
queue.Queue
- loop_time
Time between iterations of the run loop.
- Type:
float
- record(test_mode=False)[source]
Process - This process will create an Aggregator instance, which will collect and write provider data to disk as long as this process is running.
- Parameters:
test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.
Notes
The most recent file and active providers will be returned in the session data:
>>> response.session['data'] {"current_file": "/data/16020/1602089117.g3", "providers": { "observatory.fake-data1.feeds.false_temperatures": { "last_refresh": 1602089118.8225083, "sessid": "1602088928.8294137", "stale": false, "last_block_received": "temps"}, "observatory.LSSIM.feeds.temperatures": { "last_refresh": 1602089118.8223345, "sessid": "1602088932.335811", "stale": false, "last_block_received": "temps"}}}
Supporting APIs
- class ocs.agents.aggregator.drivers.Provider(address, sessid, prov_id, frame_length=300, fresh_time=180)[source]
Stores data for a single provider (OCS Feed). This class should only be accessed via a single thread.
- Parameters:
addresss (string) – Full address of the provider
sessid (string) – session_id of the provider
prov_id (bool) – id assigned to the provider by the HKSessionHelper
frame_length (float, optional) – Time (in seconds) before data should be written into a frame. Defaults to 5 min.
fresh_time (float, optional) – Time (in seconds) before provider should be considered stale. Defaults to 3 min.
- blocks
All blocks that are written by provider.
- Type:
dict
- frame_start_time
Start time of current frame
- Type:
float
- fresh_time
time (in seconds) that the provider can go without data before being labeled stale, and scheduled to be removed
- Type:
float
- last_refresh
Time when the provider was last refreshed (either through data or agent heartbeat).
- Type:
time
- last_block_received
String of the last block_name received.
- Type:
str
- log
txaio logger
- Type:
txaio.Logger
- refresh()[source]
Refresh provider
- stale()[source]
Returns true if provider is stale and should be removed
- new_frame_time()[source]
Returns true if its time for a new frame to be written
- empty()[source]
Returns true if all blocks are empty
- save_to_block(data)[source]
Saves a list of data points into blocks. A block will be created for any new block_name.
Examples
The format of data is shown in the following example:
>>> data = {'test': {'block_name': 'test', 'timestamps': [time.time()], 'data': {'key1': [1], 'key2': [2]}, } } >>> prov.save_to_block(data)
Note the block name shows up twice, once as the dict key in the outer data dictionary, and again under the ‘block_name’ value. These must match – in this instance both the word ‘test’.
- Parameters:
data (dict) – data dictionary from incoming data queue
- clear()[source]
Clears all blocks and resets the frame_start_time
- to_frame(hksess=None, clear=False)[source]
Returns a G3Frame based on the provider’s blocks.
- Parameters:
hksess (optional) – If provided, the frame will be based off of hksession’s data frame. If the data will be put into a clean frame.
clear (bool) – Clears provider data if True.
- class ocs.agents.aggregator.drivers.G3FileRotator((object)arg1)[source]
G3 module which handles file rotation. After time_per_file has elapsed, the rotator will end that file and create a new file with the filename function. It will write the last_session and last_status frame to any new file if they exist.
This class should only be accessed via a single thread.
- Parameters:
time_per_file (float) – time (seconds) before a new file should be written
filename (callable) – function that generates new filenames.
- filename
Function to call to create new filename on rotation
- Type:
function
- file_start_time
Start time for current file
- Type:
int
- writer
G3Writer object for current file. None if no file is open.
- Type:
core.G3Writer
- last_session
Last session frame written to disk. This is stored and written as the first frame on file rotation.
- Type:
core.G3Frame
- last_status
Last status frame written to disk. Stored and written as the second frame on file rotation.
- Type:
core.G3Frame
- current_file
Path to the current file being written.
- Type:
str, optional
- flush()[source]
Flushes current g3 file to disk
- Process(frames)[source]
Writes frame to current file. If file has not been started or time_per_file has elapsed, file is closed and a new file is created by filename function passed to constructor
- class ocs.agents.aggregator.drivers.Aggregator(incoming_data, time_per_file, data_dir, session=None)[source]
Data aggregator. This manages a collection of providers, and contains methods to write them to disk.
This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.
- Parameters:
incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.
time_per_file (float) – Time (sec) before a new file should be written to disk.
data_dir (path) – Base directory for new files to be written.
session (OpSession, optional) – Session object of current agent process. If not specified, session data will not be written.
- log
txaio logger
- Type:
txaio.Logger
- hksess
HKSession helper that assigns provider id’s to providers, and constructs so3g frames.
- Type:
so3g.HKSessionHelper
- writer
Module to use to write frames to disk.
- Type:
G3Module
- providers
dictionary of active providers, indexed by the hksess’s assigned provider id.
- Type:
Dict[Provider]
- pids
Dictionary of provider id’s assigned by the hksession. Indexed by (prov address, session_id).
- Type:
Dict[Int]
- write_status
If true, a status frame will be written next time providers are written to disk. This is set to True whenever a provider is added or removed.
- Type:
bool
- process_incoming_data()[source]
Takes all data from the incoming_data queue, and puts them into provider blocks.
- add_provider(prov_address, prov_sessid, **prov_kwargs)[source]
Registers a new provider and writes a status frame.
- Parameters:
prov_address (str) – full address of provider
prov_sessid (str) – session id of provider
- Optional Arguments:
Additional kwargs are passed directly to the Provider constructor, so defaults are set there.
- remove_provider(prov)[source]
Writes remaining provider data to frame and removes provider.
- Parameters:
prov (Provider) – provider object that should be removed.
- remove_stale_providers()[source]
Loops through all providers and check if they’ve gone stale. If they have, write their remaining data to disk (they shouldn’t have any) and delete them.
- write_to_disk(clear=True, write_all=False)[source]
Loop through all providers, and write their data to the frame_queue if they have surpassed their frame_time, or if write_all is True.
- Parameters:
clear (bool) – If True, provider data is cleared after write
write_all (bool) – If true all providers are written to disk regardless of whether frame_time has passed.
- run()[source]
Main run iterator for the aggregator. This processes all incoming data, removes stale providers, and writes active providers to disk.
- close()[source]
Flushes all remaining providers and closes file.