API

This page contains the auto-generated documentation for the ocs package.

agents

The agents/ directory contains the OCS Agents, as well as any supporting code.

agents.aggregator

class ocs.agents.aggregator.agent.AggregatorAgent(agent, args)[source]

Bases: object

This class provide a WAMP wrapper for the data aggregator. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.

Parameters:
  • agent (OCSAgent) – OCS Agent object

  • args (namespace) – args from the function’s argparser.

time_per_file

Time (sec) before files should be rotated.

Type:

int

data_dir

Path to the base directory where data should be written.

Type:

path

aggregate

Specifies if the agent is currently aggregating data.

Type:

bool

incoming_data

Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Aggregator.

Type:

queue.Queue

loop_time

Time between iterations of the run loop.

Type:

float

record(test_mode=False)[source]

Process - This process will create an Aggregator instance, which will collect and write provider data to disk as long as this process is running.

Parameters:

test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.

Notes

The most recent file and active providers will be returned in the session data:

>>> response.session['data']
{"current_file": "/data/16020/1602089117.g3",
 "providers": {
    "observatory.fake-data1.feeds.false_temperatures": {
        "last_refresh": 1602089118.8225083,
        "sessid": "1602088928.8294137",
        "stale": false,
        "last_block_received": "temps"},
    "observatory.LSSIM.feeds.temperatures": {
         "last_refresh": 1602089118.8223345,
         "sessid": "1602088932.335811",
         "stale": false,
         "last_block_received": "temps"}}}
ocs.agents.aggregator.agent.make_parser(parser=None)[source]
ocs.agents.aggregator.agent.main(args=None)[source]
ocs.agents.aggregator.drivers.g3_cast(data, time=False)[source]
Casts a generic datatype into a corresponding G3 type. With:

int -> G3Int str -> G3String float -> G3Double bool -> G3Bool

and lists of type X will go to G3VectorX. If time is set to True, will convert to G3Time or G3VectorTime with the assumption that data consists of unix timestamps.

Parameters:
  • data (int, str, float, or list) – Generic data to be converted to a corresponding G3Type.

  • time (bool, optional) – If True, will assume data contains unix timestamps and try to cast to G3Time or G3VectorTime.

Returns:

Corresponding G3 datatype.

Return type:

g3_data

ocs.agents.aggregator.drivers.generate_id(hksess)[source]

Generates a unique session id based on the start_time, process_id, and hksess description.

Parameters:

hksess (so3g.HKSessionHelper)

ocs.agents.aggregator.drivers.make_filename(base_dir, make_subdirs=True)[source]

Creates a new filename based on the time and base_dir. If make_subdirs is True, all subdirectories will be automatically created. I don’t think there’s any reason that this shouldn’t be true…

Parameters:
  • base_dir (path) – Base path where data should be written.

  • make_subdirs (bool) – True if func should automatically create non-existing subdirs.

class ocs.agents.aggregator.drivers.Provider(address, sessid, prov_id, frame_length=300, fresh_time=180)[source]

Bases: object

Stores data for a single provider (OCS Feed). This class should only be accessed via a single thread.

Parameters:
  • addresss (string) – Full address of the provider

  • sessid (string) – session_id of the provider

  • prov_id (bool) – id assigned to the provider by the HKSessionHelper

  • frame_length (float, optional) – Time (in seconds) before data should be written into a frame. Defaults to 5 min.

  • fresh_time (float, optional) – Time (in seconds) before provider should be considered stale. Defaults to 3 min.

blocks

All blocks that are written by provider.

Type:

dict

frame_start_time

Start time of current frame

Type:

float

fresh_time

time (in seconds) that the provider can go without data before being labeled stale, and scheduled to be removed

Type:

float

last_refresh

Time when the provider was last refreshed (either through data or agent heartbeat).

Type:

time

last_block_received

String of the last block_name received.

Type:

str

log

txaio logger

Type:

txaio.Logger

encoded()[source]
refresh()[source]

Refresh provider

stale()[source]

Returns true if provider is stale and should be removed

new_frame_time()[source]

Returns true if its time for a new frame to be written

empty()[source]

Returns true if all blocks are empty

save_to_block(data)[source]

Saves a list of data points into blocks. A block will be created for any new block_name.

Examples

The format of data is shown in the following example:

>>> data = {'test': {'block_name': 'test',
                 'timestamps': [time.time()],
                 'data': {'key1': [1],
                          'key2': [2]},
                 }
           }
>>> prov.save_to_block(data)

Note the block name shows up twice, once as the dict key in the outer data dictionary, and again under the ‘block_name’ value. These must match – in this instance both the word ‘test’.

Parameters:

data (dict) – data dictionary from incoming data queue

clear()[source]

Clears all blocks and resets the frame_start_time

to_frame(hksess=None, clear=False)[source]

Returns a G3Frame based on the provider’s blocks.

Parameters:
  • hksess (optional) – If provided, the frame will be based off of hksession’s data frame. If the data will be put into a clean frame.

  • clear (bool) – Clears provider data if True.

class ocs.agents.aggregator.drivers.G3FileRotator((object)arg1)[source]

Bases: G3Module

G3 module which handles file rotation. After time_per_file has elapsed, the rotator will end that file and create a new file with the filename function. It will write the last_session and last_status frame to any new file if they exist.

This class should only be accessed via a single thread.

Parameters:
  • time_per_file (float) – time (seconds) before a new file should be written

  • filename (callable) – function that generates new filenames.

filename

Function to call to create new filename on rotation

Type:

function

file_start_time

Start time for current file

Type:

int

writer

G3Writer object for current file. None if no file is open.

Type:

core.G3Writer

last_session

Last session frame written to disk. This is stored and written as the first frame on file rotation.

Type:

core.G3Frame

last_status

Last status frame written to disk. Stored and written as the second frame on file rotation.

Type:

core.G3Frame

current_file

Path to the current file being written.

Type:

str, optional

close_file()[source]
flush()[source]

Flushes current g3 file to disk

Process(frames)[source]

Writes frame to current file. If file has not been started or time_per_file has elapsed, file is closed and a new file is created by filename function passed to constructor

class ocs.agents.aggregator.drivers.Aggregator(incoming_data, time_per_file, data_dir, session=None)[source]

Bases: object

Data aggregator. This manages a collection of providers, and contains methods to write them to disk.

This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.

Parameters:
  • incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.

  • time_per_file (float) – Time (sec) before a new file should be written to disk.

  • data_dir (path) – Base directory for new files to be written.

  • session (OpSession, optional) – Session object of current agent process. If not specified, session data will not be written.

log

txaio logger

Type:

txaio.Logger

hksess

HKSession helper that assigns provider id’s to providers, and constructs so3g frames.

Type:

so3g.HKSessionHelper

writer

Module to use to write frames to disk.

Type:

G3Module

providers

dictionary of active providers, indexed by the hksess’s assigned provider id.

Type:

Dict[Provider]

pids

Dictionary of provider id’s assigned by the hksession. Indexed by (prov address, session_id).

Type:

Dict[Int]

write_status

If true, a status frame will be written next time providers are written to disk. This is set to True whenever a provider is added or removed.

Type:

bool

process_incoming_data()[source]

Takes all data from the incoming_data queue, and puts them into provider blocks.

add_provider(prov_address, prov_sessid, **prov_kwargs)[source]

Registers a new provider and writes a status frame.

Parameters:
  • prov_address (str) – full address of provider

  • prov_sessid (str) – session id of provider

Optional Arguments:

Additional kwargs are passed directly to the Provider constructor, so defaults are set there.

remove_provider(prov)[source]

Writes remaining provider data to frame and removes provider.

Parameters:

prov (Provider) – provider object that should be removed.

remove_stale_providers()[source]

Loops through all providers and check if they’ve gone stale. If they have, write their remaining data to disk (they shouldn’t have any) and delete them.

write_to_disk(clear=True, write_all=False)[source]

Loop through all providers, and write their data to the frame_queue if they have surpassed their frame_time, or if write_all is True.

Parameters:
  • clear (bool) – If True, provider data is cleared after write

  • write_all (bool) – If true all providers are written to disk regardless of whether frame_time has passed.

run()[source]

Main run iterator for the aggregator. This processes all incoming data, removes stale providers, and writes active providers to disk.

close()[source]

Flushes all remaining providers and closes file.

agents.barebones

class ocs.agents.barebones.agent.BarebonesAgent(agent)[source]

Bases: object

Barebone Agent demonstrating writing an Agent from scratch.

This Agent is meant to be an example for Agent development, and provides a clean starting point when developing a new Agent.

Parameters:

agent (OCSAgent) – OCSAgent object from ocs.ocs_agent.init_site_agent().

agent

OCSAgent object from ocs.ocs_agent.init_site_agent().

Type:

OCSAgent

log

Logger object used to log events within the Agent.

Type:

txaio.tx.Logger

lock

TimeoutLock object used to prevent simultaneous commands being sent to hardware.

Type:

TimeoutLock

_count

Internal tracking of whether the Agent should be counting or not. This is used to exit the Process loop by changing it to False via the count.stop() command. Your Agent won’t use this exact attribute, but might have a similar one.

Type:

bool

count(test_mode=False)[source]

Process - Count up from 0.

The count will restart if the process is stopped and restarted.

Notes

The most recent value is stored in the session data object in the format:

>>> response.session['data']
{"value": 0,
 "timestamp":1600448753.9288929}
print(text='hello world')[source]

Task - Print some text passed to a Task.

Parameters:

text (str) – Text to print out. Defaults to ‘hello world’.

Notes

The session data will be updated with the text:

>>> response.session['data']
{'text': 'hello world',
 'last_updated': 1660249321.8729222}
ocs.agents.barebones.agent.add_agent_args(parser_in=None)[source]
ocs.agents.barebones.agent.main(args=None)[source]

agents.fake_data

class ocs.agents.fake_data.agent.FakeDataAgent(agent, num_channels=2, sample_rate=10.0, frame_length=60)[source]

Bases: object

try_set_job(job_name)[source]
set_job_done()[source]
acq(test_mode=False, degradation_period=None)[source]

Process - Acquire data and write to the feed.

Parameters:
  • test_mode (bool, optional) – Run the acq Process loop only once. This is meant only for testing. Default is False.

  • degradation_period (float, optional) – If set, then alternately mark self as degraded / not degraded with this period (in seconds).

Notes

The most recent fake values are stored in the session data object in the format:

>>> response.session['data']
{"fields":
    {"channel_00": 0.10250430068515494,
     "channel_01": 0.08550903376216404,
     "channel_02": 0.10481891991693446,
     "channel_03": 0.10793263271024509},
 "timestamp":1600448753.9288929}

The channels kept in fields are the ‘faked’ data, in a similar structure to the Lakeshore agents. ‘timestamp’ is the last time these values were updated.

count_seconds(session, params)[source]
set_heartbeat(heartbeat=True)[source]

Task - Set the state of the agent heartbeat.

Parameters:

heartbeat (bool, optional) – True for on (the default), False for off

delay_task(delay=5, succeed=True)[source]

Task (abortable) - Sleep (delay) for the requested number of seconds.

This can run simultaneously with the acq Process. This Task should run in the reactor thread.

Parameters:
  • delay (float, optional) – Time to wait before returning, in seconds. Defaults to 5.

  • succeed (bool, optional) – Whether to return success or not. Defaults to True.

Notes

The session data will be updated with the requested delay as well as the time elapsed so far, for example:

>>> response.session['data']
{'requested_delay': 5.,
 'delay_so_far': 1.2}
ocs.agents.fake_data.agent.add_agent_args(parser_in=None)[source]
ocs.agents.fake_data.agent.main(args=None)[source]

agents.host_manager

class ocs.agents.host_manager.agent.HostManager(agent, docker_composes=[], docker_service_prefix='ocs-')[source]

Bases: object

This Agent is used to start and stop OCS-relevant services on a particular host. If the HostManager is launched automatically when a system boots, it can then be used to start up the rest of OCS on that host (either automatically or on request).

manager(requests=[], reload_config=True)[source]

Process - The “manager” Process maintains a list of child Agents for which it is responsible. In response to requests from a client, the Process will launch or terminate child Agents.

Parameters:
  • requests (list) – List of agent instance target state requests; e.g. [(‘instance1’, ‘down’)]. See description in update() Task.

  • reload_config (bool) – When starting up, discard any cached database of tracked agents and rescan the Site Config File. This is mostly for debugging.

Notes

If an Agent process exits unexpectedly, it will be relaunched within a few seconds.

When this Process is started (or restarted), the list of tracked agents and their status is completely reset, and the Site Config File is read in.

Once this process is running, the target states for managed Agents can be manipulated through the update() task.

Note that when a stop is requested on this Process, all managed Agents will be moved to the “down” state and an attempt will be made to terminate them before the Process exits.

The session.data is a dict, and entry ‘child_states’ contains a list with the managed Agent statuses. For example:

{'child_states': [
  {'next_action': 'up',
   'target_state': 'up',
   'stability': 1.0,
   'agent_class': 'Lakeshore372Agent',
   'instance_id': 'thermo1'},
  {'next_action': 'down',
   'target_state': 'down',
   'stability': 1.0,
   'agent_class': 'ACUAgent',
   'instance_id': 'acu-1'},
  {'next_action': 'up',
   'target_state': 'up',
   'stability': 1.0,
   'agent_class': 'FakeDataAgent[d]',
   'instance_id': 'faker6'},
  ],
}

If you are looking for the “current state”, it’s called “next_action” here.

The agent_class may include a suffix [d] or [d?], indicating that the agent is configured to run within a docker container. (The question mark indicates that the HostManager cannot actually identify the docker-compose service associated with the agent description in the SCF.)

update(requests=[], reload_config=False)[source]

Task - Update the target state for any subset of the managed agent instances. Optionally, trigger a full reload of the Site Config File first.

Parameters:
  • requests (list) – Default is []. Each entry must be a tuple of the form (instance_id, target_state). The instance_id must be a string that matches an item in the current list of tracked agent instances, or be the string ‘all’, which will match all items being tracked. The target_state must be ‘up’ or ‘down’.

  • reload_config (bool) – Default is False. If True, the site config file and docker-compose files are reparsed in order to (re-)populate the database of child Agent instances.

Examples

update(requests=[('thermo1', 'down')])
update(requests=[('all', 'up')])
update(reload_config=True)

Notes

Starting and stopping agent instances is handled by the manager() Process; if that Process is not running then no action is taken by this Task and it will exit with an error.

The entries in the requests list are processed in order. For example, if the requests were [(‘all’, ‘up’), (‘data1’, ‘down’)]. This would result in setting all known children to have target_state “up”, except for “data1” which would be given target state of “down”.

If reload_config is True, the Site Config File will be reloaded (as described in _reload_config()) before any of the requests are processed.

Managed docker-compose.yaml files are reparsed, continously, by the manager process – no specific action is taken with those in this Task. Note that adding/changing the list of docker-compose.yaml files requires restarting the agent.

die(session, params)[source]
ocs.agents.host_manager.agent.make_parser(parser=None)[source]
ocs.agents.host_manager.agent.main(args=None)[source]
class ocs.agents.host_manager.drivers.ManagedInstance[source]

Bases: dict

Track properties of a managed Agent-instance. This is just a dict with a schema docstring and an “init” function to set defaults.

Properties that must be set explicitly by user:

  • ‘management’ (str): Either ‘host’, ‘docker’, or ‘retired’.

  • ‘agent_class’ (str): The agent class name, which may include a suffix ([d] or [d?]) if the agent is managed through Docker. For instances corresponding to docker services that do not have a corresponding SCF entry, the value here will be ‘[docker]’.

  • ‘instance_id’ (str): The agent instance-id, or the docker service name if the instance is an unmatched docker service.

  • ‘full_name’ (str): agent_class:instance_id

Properties that are given a default value by init function:

  • ‘operable’ (bool): indicates whether the instance can be manipulated (whether calls to up/down should be expected to work).

  • ‘agent_script’ (str): Path to the launcher script (if host system managed). If docker-managed, this is the service name.

  • ‘prot’: The twisted ProcessProtocol object (if host system managed), or the DockerContainerHelper (if a docker container).

  • ‘target_state’ (state): The state we’re trying to achieve (up or down).

  • ‘next_action’ (state): The thing HostManager needs to do next; this will sometimes indicate the “current state” (up or down), but sometimes it will carry a transitional state, such as “wait_start”.

  • ‘at’ (float): a unix timestamp for transitional states (e.g. used to set how long to wait for something).

  • ‘fail_times’ (list of floats): unix timestamps when the instance process has stopped unexpectedly (used to identify “unstable” agents).

classmethod init(**kwargs)[source]
ocs.agents.host_manager.drivers.resolve_child_state(db)[source]
Parameters:

db (ManagedInstance) – the instance state information. This will be modified in place.

Returns:

  • ‘messages’ (list of str): messages for the session.

  • ’launch’ (bool): whether to launch a new instance.

  • ’terminate’ (bool): whether to terminate the instance.

  • ’sleep’ (float): maximum delay before checking back, or None if this machine doesn’t care.

Return type:

Dict with important actions for caller to take. Content is

ocs.agents.host_manager.drivers.stability_factor(times, window=120)[source]

Given an increasing list of failure times, quantify the stability of the activity.

A single failure, 10 seconds in the past, has a stability factor of 0.5; if there were additional failures before that, the stability factor will be lower.

Returns a culled list of stop times and a stability factor (0 - 1).

class ocs.agents.host_manager.drivers.AgentProcessHelper(instance_id, cmd)[source]

Bases: ProcessProtocol

up()[source]
down()[source]
connectionMade()[source]

Called when a connection is made.

This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here.

inConnectionLost()[source]

This will be called when stdin is closed.

processExited(status)[source]

This will be called when the subprocess exits.

@type reason: L{twisted.python.failure.Failure}

outReceived(data)[source]

Some data was received from stdout.

errReceived(data)[source]

Some data was received from stderr.

class ocs.agents.host_manager.drivers.DockerContainerHelper(service, docker_bin=None)[source]

Bases: object

Class for managing the docker container associated with some service. Provides some of the same interface as AgentProcessHelper. Pass in a service description dict (such as the ones returned by parse_docker_state).

update(service)[source]

Update self.status based on service info (in format returned by parse_docker_state).

up()[source]
down()[source]
ocs.agents.host_manager.drivers.parse_docker_state(docker_compose_file)[source]

Analyze a docker compose.yaml file to get a list of services. Using docker compose ps and docker inspect, determine whether each service is running or not.

Returns:

A dict where the key is the service name and each value is a dict with the following entries:

  • ’compose_file’: the path to the docker compose file

  • ’service’: service name

  • ’container_found’: bool, indicates whether a container for this service was found (whether or not it was running).

  • ’running’: bool, indicating that a container for this service is currently in state “Running”.

  • ’exit_code’: int, which is either extracted from the docker inspect output or is set to 127. (This should never be None.)

agents.influxdb_publisher

class ocs.agents.influxdb_publisher.agent.InfluxDBAgent(agent, args)[source]

Bases: object

This class provide a WAMP wrapper for the data publisher. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.

Parameters:
  • agent (OCSAgent) – OCS Agent object

  • args (namespace) – args from the function’s argparser.

data_dir

Path to the base directory where data should be written.

Type:

path

aggregate

Specifies if the agent is currently aggregating data.

Type:

bool

incoming_data

Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher.

Type:

queue.Queue

loop_time

Time between iterations of the run loop.

Type:

float

record()[source]

Process - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running.

Parameters:

test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.

ocs.agents.influxdb_publisher.agent.make_parser(parser=None)[source]
ocs.agents.influxdb_publisher.agent.main(args=None)[source]
ocs.agents.influxdb_publisher.drivers.timestamp2influxtime(time, protocol)[source]

Convert timestamp for influx, always in UTC.

Parameters:
  • time – ctime timestamp

  • protocol – ‘json’ or line’

class ocs.agents.influxdb_publisher.drivers.Publisher(host, database, incoming_data, port=8086, protocol='line', gzip=False, operate_callback=None)[source]

Bases: object

Data publisher. This manages data to be published to the InfluxDB.

This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.

Parameters:
  • incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.

  • host (str) – host for InfluxDB instance.

  • database (str) – database name within InfluxDB to publish to

  • port (int, optional) – port for InfluxDB instance, defaults to 8086.

  • protocol (str, optional) – Protocol for writing data. Either ‘line’ or ‘json’.

  • gzip (bool, optional) – compress influxdb requsts with gzip

  • operate_callback (callable, optional) – Function to call to see if failed connections should be retried (to prevent a thread from locking).

host

host for InfluxDB instance.

Type:

str

port

port for InfluxDB instance, defaults to 8086.

Type:

int, optional

db

database name within InfluxDB to publish to (from database arg)

Type:

str

incoming_data

data to be published

client

InfluxDB client connection

process_incoming_data()[source]

Takes all data from the incoming_data queue, and writes them to the InfluxDB.

static format_data(data, feed, protocol)[source]

Format the data from an OCS feed into a dict for pushing to InfluxDB.

The scheme here is as follows:
  • agent_address is the “measurement” (conceptually like an SQL table)

  • feed names are an indexed “tag” on the data structure (effectively a table column)

  • keys within an OCS block’s ‘data’ dictionary are the field names (effectively a table column)

Parameters:
  • data (dict) – data from the OCS Feed subscription

  • feed (dict) – feed from the OCS Feed subscription, contains feed information used to structure our influxdb query

  • protocol (str) – Protocol for writing data. Either ‘line’ or ‘json’.

Returns:

Data ready to publish to influxdb, in the specified protocol.

Return type:

list

run()[source]

Main run iterator for the publisher. This processes all incoming data, removes stale providers, and writes active providers to disk.

close()[source]

Flushes all remaining data and closes InfluxDB connection.

agents.registry

class ocs.agents.registry.agent.RegisteredAgent(feed)[source]

Bases: object

Contains data about registered agents.

Parameters:

feed (dict) – Encoded ocs.ocs_feed.Feed.

expired

True if agent has not been updated in Registry.agent_timeout seconds.

Type:

bool

time_expired

ctime at which the agent expired. This will be None if the agent is not expired.

Type:

float, optional

last_updated

ctime at which the agent was last updated

Type:

float

op_codes

Dictionary of operation codes for each of the agent’s operations. For details on what the operation codes mean, see docs from the ocs_agent module

Type:

dict

refresh(op_codes=None)[source]
expire()[source]
encoded()[source]
class ocs.agents.registry.agent.Registry(agent, args)[source]

Bases: object

The Registry agent is in charge of keeping track of which agents are currently running. It has a single process “main” that loops and keeps track of when agents expire. This agent subscribes to all heartbeat feeds, so no additional function calls are required to register an agent.

A list of agent statuses is maintained in the “main” process’s session.data object.

Parameters:

agent (OCSAgent) – the ocs agent object

registered_agents

A defaultdict of RegisteredAgent objects, which contain whether the agent has expired, the time_expired, and the last_updated time.

Type:

defaultdict

agent_timeout

The time an agent has between heartbeats before being marked as expired.

Type:

float

main(test_mode=False)[source]

Process - Main run process for the Registry agent. This will loop and keep track of which agents have expired. It will keep track of current active agents in the session.data variable so it can be seen by clients.

Parameters:

test_mode (bool, optional) – Run the main Process loop only once. This is meant only for testing. Default is False.

Notes

The session data object for this process will be a dictionary containing the encoded RegisteredAgent objects for each agent observed during the lifetime of the Registry. For instance, this might look like:

>>> response.session['data']
{
  "observatory.aggregator": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925713.4082503,
    "op_codes": {
      "record": 3
    },
    "agent_class": "AggregatorAgent",
    "agent_address": "observatory.aggregator"
  },
  "observatory.fake-hk-agent-01": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925945.7575383,
    "op_codes": {
      "acq": 3,
      "set_heartbeat": 1,
      "delay_task": 1
    },
    "agent_class": "FakeDataAgent",
    "agent_address": "observatory.fake-hk-agent-01"
  }
}
ocs.agents.registry.agent.make_parser(parser=None)[source]
ocs.agents.registry.agent.main(args=None)[source]

ocs.agent_cli

ocs.agent_cli.build_agent_list()[source]

Builds a list of all Agents available across all ocs plugins installed on the system.

Note

Currently if two plugins provide the same Agent the one loaded first is used. This should be improved somehow if we expect overlapping Agents to be provided by plugins.

Examples

An example agent list:

>>> build_agent_list()
{'RegistryAgent': {'module': 'ocs.agents.registry.agent', 'entry_point': 'main'},
'AggregatorAgent': {'module': 'ocs.agents.aggregator.agent', 'entry_point': 'main'},
'HostManager': {'module': 'ocs.agents.host_manager.agent', 'entry_point': 'main'},
'FakeDataAgent': {'module': 'ocs.agents.fake_data.agent', 'entry_point': 'main'},
'InfluxDBAgent': {'module': 'ocs.agents.influxdb_publisher.agent', 'entry_point': 'main'},
'BarebonesAgent': {'module': 'ocs.agents.barebones.agent', 'entry_point': 'main'}}
Returns:

Dictionary of available agents, with agent names as the keys, and dicts containing the module and entry_point as values.

Return type:

dict

ocs.base

class ocs.base.ResponseCode(value)[source]

Bases: Enum

Enumeration of response codes from the Operation API (start, stop, wait, …).

These response codes indicate only whether the API call was successful, in that the request was propagated all the way to the Agent’s Operation code. They are not used to represent success or failure of the Operation itself.

OK = 0

OK indicates the request was successful.

ERROR = -1

ERROR indicates that the request could not be propagated fully. This may occur, for example, if an invalid Operation name is passed, if a request is made that conflicts with an Operation’s current state (e.g. .start() is called on an already-running Operation), if an API call is made to a Operation of an incompatible type (e.g. .stop() on a Task), or due to API syntax error (e.g. misspelled keyword argument to .wait()).

TIMEOUT = 1

TIMEOUT is returned in the case that a Client issued a blocking call with timeout (.wait()), and the timeout expired before the Operation completed.

class ocs.base.OpCode(value)[source]

Bases: Enum

Enumeration of OpSession “op_code” values.

The op_code corresponds to the session.status, with the following extensions:

  • If the session.status == “done” then the op_code will be assigned a value of either SUCCEEDED or FAILED based on session.success.

  • If the session.status == “running”, and session.degraded is True, then the op_code will be DEGRADED rather than RUNNING.

NONE = 1

NONE is used to represent an uninitialized OpSession, and does not correspond to some attempt to run the Operation.

STARTING = 2

STARTING indicates that start() has been successfully called, but the Operation has not yet marked itself as successfully launched. If this state is reached, then at the very least the start request was not rejected because it was already running.

RUNNING = 3

RUNNING indicates that the Operation has performed its basic initialization and parameter checking and is performing its task. Operation codes need to explicitly mark themselves as running by calling session.set_state(‘running’).

STOPPING = 4

STOPPING indicates that the Agent has received a stop or abort request for this Operation and will try to wrap things up ASAP.

SUCCEEDED = 5

SUCCEEDED indicates that the Operation has terminated and has indicated the Operation was successful. This includes the case of a Process that was asked to stop and has shut down cleanly.

FAILED = 6

FAILED indicates that the Operation has terminated with some kind of error.

EXPIRED = 7

EXPIRED may used to mark session information as invalid in cases where the state cannot be determined.

DEGRADED = 8

DEGRADED indicates that an operation meets the requirements for state RUNNING, but is self-reporting as being in a problematic state where it is unable to perform its primary functions (for example, if a Process to operate some hardware is trying to re-establish connection to that hardware).

ocs.client_http

exception ocs.client_http.ControlClientError[source]

Bases: RuntimeError

class ocs.client_http.ControlClient(agent_addr, **kwargs)[source]

Bases: object

start(*args, **kwargs)[source]
stop(*args, **kwargs)[source]
call(procedure, *args, **kwargs)[source]
get_api(simple=False)[source]

Query the API and other info from the Agent; this includes lists of Processes, Tasks, and Feeds, docstrings, operation session structures, and info about the Agent instance (class, PID, host).

Parameters:

simple (bool) – If True, then return just the lists of the op and feed names without accompanying detail.

Returns:

A dict, see ocs.ocs_agent.OCSAgent._management_handler() for detail.

get_tasks()[source]

Query the list of Tasks from the Agent management interface.

Returns a list of items of the form (task_name, info_dict).

get_processes()[source]

Query the list of Processes from the Agent management interface.

Returns a list of items of the form (process_name, info_dict).

get_feeds()[source]

Query the list of Feeds from the Agent management interface.

Returns a list of items of the form (feed_name, info_dict).

request(action, op_name, params={}, **kw)[source]

Issue a request on an Agent’s .ops interface.

Parameters:
  • action (string) – The action name (start, status, etc).

  • params (dict) – Parameters to pass to the action.

Returns:

Tuple (status, message, session).

ocs.client_t

ocs.client_t.run_control_script2(*args, **kwargs)[source]

Deprecated since version v0.6.1: Renamed to run_control_script

ocs.client_t.run_control_script(function, parser=None, *args, **kwargs)[source]

Run a function in a ControlClientSession, within a site_config paradigm, assuming that additional configuration will be passed through command line arguments.

Parameters:
  • function – The function to call.

  • parser – argparse.ArgumentParser, with control script options pre-loaded. If None, this will be created internally in the usual way.

The command line is parsed and site configuration information is loaded. The WAMP server information is used to configure the Control Client’s connection before launching the function. The function is invoked as:

function(app, parser_args, *args, **kwargs)

where app is the ControlClientSession and parser_args is the argparse.Namespace object including processing done by ocs.site_config. Note that the observatory root_address is contained in parser_args.root_address. Any additional arguments defined in the parser will also be present.

This function can be invoked with parser=None, or else with a parser that has been initialized for control client purposes, like this:

from ocs import client_t, site_config
parser = site_control.add_arguments()  # initialized ArgParser
parser.add_option('--target')          # Options for this client
client_t.run_control_script2(my_script, parser=parser)

In the my_script function, use parser_args.target to get the target.

class ocs.client_t.ControlClientSession(config, script, script_args, script_kwargs)[source]

Bases: ApplicationSession

Implements autobahn.wamp.interfaces.ISession()

onConnect()[source]

Implements autobahn.wamp.interfaces.ISession.onConnect()

onChallenge(challenge)[source]

Implements autobahn.wamp.interfaces.ISession.onChallenge()

onLeave(details)[source]

Implements autobahn.wamp.interfaces.ISession.onLeave()

onDisconnect()[source]

Implements autobahn.wamp.interfaces.ISession.onDisconnect()

onJoin(details)[source]

Implements autobahn.wamp.interfaces.ISession.onJoin()

class ocs.client_t.OperationClient(app, root_address, op_name)[source]

Bases: object

The endpoint client is associated with a single operation rather than with an operation server.

request(action, params=None, timeout=None)[source]
status(params=None)[source]
start(params=None)[source]
wait(params=None, timeout=None)[source]
class ocs.client_t.TaskClient(app, root_address, op_name)[source]

Bases: OperationClient

abort(params=None)[source]
class ocs.client_t.ProcessClient(app, root_address, op_name)[source]

Bases: OperationClient

stop(params=None)[source]

ocs.matched_client

ocs.matched_client.MatchedClient(instance_id, **kwargs)[source]

Deprecated since version v0.9.0: Renamed to OCSClient

ocs.ocs_agent

ocs.ocs_agent.init_site_agent(args, address=None)[source]

Create ApplicationSession and ApplicationRunner instances, set up to communicate on the chosen WAMP realm.

Parameters:

args (argparse.Namespace) – The arguments, as processed by ocs.site_config.

Returns: (agent, runner).

ocs.ocs_agent.log_formatter(event)[source]
class ocs.ocs_agent.OCSAgent(config, site_args, address=None, class_name=None)[source]

Bases: ApplicationSession

OCSAgent is used to connect blocking device control code to the OCS. OCSAgent is an ApplicationSession and its methods are all run in the twisted main Reactor thread.

To make use of OCSAgent, the user instantiates it (perhaps through init_ocs_agent) with a particular agent_address. Then the user registers task and process functions by calling register_task() and register_process(). These are (blocking) functions that will be called, in their own twisted thread, when the “start” method is request.

The OCSAgent automatically registers handlers in WAMP, namely:

{agent_address} - the management_handler function, which

responds to queries about what tasks and processes are exposed by this agent.

{agent_address}.ops - the device_handler function, which accepts

Operation commands (start, status, etc.) for all Tasks and Processes.

The OCSAgent also makes use of pubsub channels:

{agent_address}.feed - a channel to which any session status

updates are published (written by the Agent; subscribed by any interested Control Tools).

Implements autobahn.wamp.interfaces.ISession()

_stop_all_running_sessions()[source]

Stops all currently running sessions.

onConnect()[source]

Implements autobahn.wamp.interfaces.ISession.onConnect()

onChallenge(challenge)[source]

Implements autobahn.wamp.interfaces.ISession.onChallenge()

_store_subscription(subscription, *args, **kwargs)[source]
_unsub_error(*args, **kwargs)[source]
_unsubscribe_all()[source]
onJoin(details)[source]

Implements autobahn.wamp.interfaces.ISession.onJoin()

onLeave(details)[source]

Implements autobahn.wamp.interfaces.ISession.onLeave()

_shutdown()[source]
onDisconnect()[source]

Implements autobahn.wamp.interfaces.ISession.onDisconnect()

encoded()[source]

Returns a dict describing this Agent. Includes ‘agent_address’, and lists of ‘feeds’, ‘tasks’, and ‘processes’.

_ops_handler(action, op_name, params=None, timeout=None)[source]
_gather_sessions(parent)[source]

Gather the session data for self.tasks or self.sessions, for return through the management_handler.

Parameters:

parent – either self.tasks or self.processes.

Returns:

A list of Operation description tuples, one per registered Task or Process. Each tuple consists of elements (name, session, op_info):

  • name: The name of the operation.

  • session: dict with OpSession.encode(() info for the active or most recent session. If no such session exists the result will have member ‘status’ set to ‘no_history’.

  • op_info: information registered about the operation, such as op_type, docstring and blocking.

_management_handler(q, **kwargs)[source]

Get a description of this Agent’s API. This is for adaptive clients (such as MatchedClient) to construct their interfaces.

Parameters:

q (string) – One of ‘get_api’, ‘get_tasks’, ‘get_processes’, ‘get_feeds’, ‘get_agent_class’.

Returns:

api_description – If the argument is ‘get_api’, then a dict with the following entries is returned:

  • ’agent_class’: The class name of this agent.

  • ’instance_hostname’: The host name where the Agent is running, as returned by socket.gethostname().

  • ’instance_pid’: The PID of the Agent interpreter session, as returned by os.getpid().

  • ’feeds’: The list of encoded feed information, tuples (feed_name, feed_info).

  • ’processes’: The list of Process api description info, as returned by _gather_sessions().

  • ’tasks’: The list of Task api description info, as returned by _gather_sessions().

Passing get_X will, for some values of X, return only that subset of the full API; treat that as deprecated.

Return type:

dict

register_task(name, func, aborter=None, blocking=True, aborter_blocking=None, startup=False)[source]

Register a Task for this agent.

Parameters:
  • name (string) – The name of the Task.

  • func (callable) – The function that will be called to handle the “start” operation of the Task.

  • aborter (callable) – The function that will be called to handle the “abort” operation of the Task (optional).

  • blocking (bool) – Indicates that func should be launched in a worker thread, rather than running in the main reactor thread.

  • aborter_blocking (bool or None) – Indicates that aborter should be run in a worker thread, rather than running in the main reactor thread. Defaults to value of blocking.

  • startup (bool or dict) – Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation does not auto-start. Otherwise, the Operation is launched on startup. If the startup argument is a dictionary, this is passed to the Operation’s start function.

Notes

The functions func and aborter will be called with arguments (session, params) where session is the active OpSession and params is passed from the client.

(Passing params to the aborter might not be supported in the client library so don’t count on that being useful.)

register_process(name, start_func, stop_func, blocking=True, stopper_blocking=None, startup=False)[source]

Register a Process for this agent.

Parameters:
  • name (string) – The name of the Process.

  • start_func (callable) – The function that will be called to handle the “start” operation of the Process.

  • stop_func (callable) – The function that will be called to handle the “stop” operation of the Process.

  • blocking (bool) – Indicates that start_func should be launched in a worker thread, rather than running in the reactor.

  • stopper_blocking (bool or None) – Indicates that stop_func should be launched in a worker thread, rather than running in the reactor. Defaults to the value of blocking.

  • startup (bool or dict) – Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation does not auto-start. Otherwise, the Operation is launched on startup. If the startup argument is a dictionary, this is passed to the Operation’s start function.

Notes

The functions start_func and stop_func will be called with arguments (session, params) where session is the active OpSession and params is passed from the client.

(Passing params to the stop_func might not be supported in the client library so don’t count on that being useful.)

call_op(agent_address, op_name, action, params=None, timeout=None)[source]

Calls ocs_agent operation.

Parameters:
  • agent_address (string) – Address of the agent who registered operation

  • op_name (string) – Name of the operation

  • action (string) – Action of operation. start, stop , wait, etc.

  • params (dict) – Params passed to operation

  • timeout (float) – timeout for operation

register_feed(feed_name, **kwargs)[source]

Initializes a new feed with name feed_name.

Parameters:
  • feed_name (string) – name of the feed

  • record (bool, optional) – Determines if feed should be aggregated. At the moment, each agent can have at most one aggregated feed. Defaults to False

  • agg_params (dict, optional) – Parameters used by the aggregator and influx publisher. See the ocs.ocs_feed.Feed docstring for the full list of aggregator params.

  • buffer_time (int, optional) – Specifies time that messages should be buffered in seconds. If 0, message will be published immediately. Defaults to 0.

  • max_messages (int, optional) – Max number of messages stored. Defaults to 20.

Returns:

The Feed object (which is also cached in self.feeds).

publish_to_feed(feed_name, message, from_reactor=None)[source]

Publish data to named feed.

Parameters:
  • feed_name (str) – should match the name of a registered feed.

  • message (serializable) – data to publish. Acceptable format depends on feed configuration; see Feed.publish_message.

  • from_reactor (bool or None) – This is deprecated; the code will check whether you’re in a thread or not.

Notes

If an unknown feed_name is passed in, an error is printed to the log and that’s all.

If you are running a “blocking” operation, in a thread, then it is best if the message is not a persistent data structure from your thread (especially something you might modify soon after this call). The code will take a copy of your structure and pass that to the reactor thread, but the copy may not be deep enough!

subscribe(handler, topic, options=None, force_subscribe=False)[source]

Subscribes to a topic for receiving events. Identical to ApplicationSession subscribe, but by default prevents re-subscription to the same topic multiple times unless force_subscribe=True.

For full documentation see: https://autobahn.readthedocs.io/en/latest/reference/autobahn.wamp.html#autobahn.wamp.interfaces.ISession.subscribe

Parameters:
subscribe_to_feed(agent_addr, feed_name, handler, options=None, force_subscribe=False)[source]

Constructs topic feed from agent address and feedname, and subscribes to it.

Parameters:
subscribe_on_start(handler, topic, options=None, force_subscribe=None)[source]

Schedules a topic to be subscribed to OnJoin. See OCSAgent.subscribe’s docstring.

_handle_task_return_val(*args, **kw)[source]
_handle_task_error(*args, **kw)[source]
start(op_name, params=None)[source]

Launch an operation. Note that successful return of this function does not mean that the operation is running; it only means that the system has requested the operation to run.

Returns tuple (status, message, session).

Possible values for status:

ocs.ERROR: the specified op_name is not known, or the op is

already running (has an active session).

ocs.OK: the Operation start routine has been launched.

wait(op_name, timeout=None)[source]

Wait for the specified Operation to become idle, or for timeout seconds to elapse. If timeout==None, the timeout is disabled and the function will not return until the Operation terminates. If timeout<=0, then the function will return immediately.

Returns (status, message, session).

Possible values for status:

ocs.TIMEOUT: the timeout expired before the Operation became

idle.

ocs.ERROR: the specified op_name is not known.

ocs.OK: the Operation has become idle.

_stop_helper(stop_type, op_name, params)[source]

Common stopper/aborter code for Process stop and Task abort.

Parameters:
  • stop_type (str) – either ‘stop’ or ‘abort’.

  • op_name (str) – the op_name.

  • params (dict or None) – Params to be passed to stopper function.

stop(op_name, params=None)[source]

Initiate a Process stop routine.

Returns (status, message, session).

Possible values for status:

ocs.ERROR: the specified op_name is not known, or refers to

a Task. Also returned if Process is known but not running.

ocs.OK: the Process stop routine has been launched.

abort(op_name, params=None)[source]

Initiate a Task abort routine.

Returns (status, message, session).

Possible values for status:

ocs.ERROR: the specified op_name is not known, or refers to

a Process. Also returned if Task is known but not running.

ocs.OK: the Process stop routine has been launched.

status(op_name, params=None)[source]

Get an Operation’s session data.

Returns (status, message, session). When there is no session data available, an empty dictionary is returned instead.

Possible values for status:

ocs.ERROR: the specified op_name is not known.

ocs.OK: the op_name was recognized.

class ocs.ocs_agent.AgentOp[source]

Bases: object

launch_deferred(session, params)[source]

Launch the operation using the launcher function, either in a worker thread (self.blocking) or in the reactor (not self.blocking). Return a Deferred. Prior to executing the operation code, set session state to “running”.

class ocs.ocs_agent.AgentTask(launcher, blocking=None, aborter=None, aborter_blocking=None)[source]

Bases: AgentOp

encoded()[source]

Dict of static info for API self-description.

class ocs.ocs_agent.AgentProcess(launcher, stopper, blocking=None, stopper_blocking=None)[source]

Bases: AgentOp

encoded()[source]

Dict of static info for API self-description.

ocs.ocs_agent.SESSION_STATUS_CODES = [None, 'starting', 'running', 'stopping', 'done']

These are the valid values for session.status. Use like this:

  • None: uninitialized.

  • starting: the Operation code has been launched and is performing basic quick checks in anticipation of moving to the (longer term) “running” state.

  • running: the Operation code has performed basic quick checks and has started to do the requested thing.

  • stopping: the Operation code has acknowledged receipt of a “stop” or “abort” request.

  • done: the Operation has exited, either succesfully or not.

class ocs.ocs_agent.OpSession(session_id, op_name, status='starting', app=None, purge_policy=None)[source]

Bases: object

When a caller requests that an Operation (Process or Task) is started, an OpSession object is created and is associated with that run of the Operation. The Operation codes are given access to the OpSession object, and may update the status and post messages to the message buffer. This is the preferred means for communicating Operation status to the caller.

In the OCSAgent model, Operations may run in the main, “reactor” thread or in a worker “pool” thread. Services provided by OpSession must support both these contexts (see, for example, add_message).

Control Clients are given a copy of the latest session information in each response from the Operation API. The format of that information is described in .encoded().

The message buffer is purged periodically.

purge_log()[source]
encoded()[source]

Encode the session data in a dict. This is the data structure that is returned to Control Clients using the Operation API, as the “session” information. Note the returned object is a dict with entries described below.

Returns:

  • session_id (int) – A unique identifier for a single session (a single “run” of the Operation). When an Operation is initiated, a new session object is created and can be distinguished from other times the Operation has been run using this id.

  • op_name (str) – The OCS Operation name.

  • op_code (int) – The OpCode, which combines information from status, success, and degraded; see ocs.base.OpCode.

  • status (str) – The Operation run status (e.g. ‘starting’, ‘done’, …). See ocs.ocs_agent.SESSION_STATUS_CODES.

  • degraded (bool) – A boolean flag (defaults to False) that an operation may set to indicate that it is not achieving its primary function (e.g. if it cannot establish connection to hardware).

  • success (bool or None) – If the Operation Session has completed (status == ‘done’), this indicates that the Operation was deemed successful. Prior to the completion of the operation, the value is None. The value could be False if the Operation reported failure, or if it crashed and failure was marked by the encapsulating OCS code.

  • start_time (float) – The time the Operation Session started, as a unix timestamp.

  • end_time (float or None) – The time the Operation Session ended, as a unix timestamp. While the Session is still on-going, this is None.

  • data (dict) – This is an area for the Operation code to store custom information for Control Clients to consume. See notes below. This structure will be tested for strict JSON compliance, and certain translations performed (such as converting NaN to None/null).

  • messages (list) – A buffer of messages posted by the Operation. Each element of the list is a tuple, (timestamp, message) where timestamp is a unix timestamp and message is a string.

Notes

The data field may be used by Agent code to provide data that might be of interest to a user (whether human or automated), such as the most recent readings from a device, structured information about the configuration and progress of the Operation, or diagnostics.

Please see developer documentation (session.data) for advice on structuring your Agent session data.

property op_code

Returns the OpCode for the given session. This is what will be published to the registry’s operation_status feed.

set_status(status, timestamp=None)[source]

Update the OpSession status and possibly post a message about it.

Parameters:
  • status (string) – New value for status (see below).

  • timestamp (float) – timestamp for the operation.

The possible values for status are:

‘starting’

This status object has just been created, and the Operation launch code has yet to run.

‘running’

The Operation is running.

‘stopping’

The Operation is running, but a stop or abort has been requested.

‘done’

The Operation is has terminated. (Failure / success must be determined separately.)

The only valid transitions are forward in the sequence [starting, running, stopping, done]; i.e. it is forbidden for the status of an OpSession to move from stopping to running.

If this function is called from a worker thread, it will be scheduled to run in the reactor, and will block until that is complete.

add_message(message, timestamp=None)[source]

Add a log message to the OpSession messages buffer.

Parameters:
  • message (string) – Message to append.

  • timestamp (float) – timestamp to tag the message. The default, which is None, will cause the timestamp to be computed here and should be used in most cases.

exception ocs.ocs_agent.ParamError(msg)[source]

Bases: Exception

class ocs.ocs_agent.ParamHandler(params)[source]

Bases: object

Helper for Agent Operation codes to extract params. Supports type checking, has casting, and will raise errors that are automatically added to the session log and propagated to the caller in a useful way.

There are two ways to use this. The first and recommended way is to use the @param decorator. Example:

from ocs import ocs_agent

class MyAgent:
    ...

    @ocs_agent.param('voltage', type=float)
    @ocs_agent.param('delay_time', default=1., type=float)
    @ocs_agent.param('other_action', default=None, cast=str)
    def my_task(self, session, params):
        # (Type checking and default substitution have been done already)
        voltage = params['voltage']
        delay_time = params['delay_time']
        other_action = params['other_action']
        ...

When you use the @param decorator, the OCS code can check the parameters immediately when they are received from the client, and immediately return an error message to the client’s start request (without even calling the Op start function):

OCSReply: ERROR : Param 'delay'=two_seconds is not of required type (<class 'float'>)
   (no session -- op has never run)

A second possibility is to instantiate a ParamHandler at the start of your Op start function, and use it to extract parameters. Example:

from ocs import ocs_agent

class MyAgent:
    ...
    def my_task(self, session, params):
        params = ocs_agent.ParamHandler(params)
        # Mandatory, and cannot be None.
        voltage = params.get('voltage', type=float)
        # Optional, defaults to 1.
        delay_time = params.get('delay_time', default=1., type=float)
        # Optional, interpret as string, but defaults to None.
        other_action = params.get('other_action', default=None, cast=str)
        ...

In this case, errors will not be immediatley returned to the user, but the Operation will quickly fail, and the error message will show up in the message log:

OCSReply: OK : Operation "my_task" is currently not running (FAILED).
  my_task[session=1]; status=done with ERROR 0.115665 s ago, took 0.000864 s
  messages (4 of 4):
    1629464204.780 Status is now "starting".
    1629464204.780 Status is now "running".
    1629464204.781 ERROR: Param 'delay'=two_seconds is not of required type (<class 'float'>)
    1629464204.781 Status is now "done".
  other keys in .session: op_code, data
get(key, default=ParamError(''), check=None, cast=None, type=None, choices=None, treat_none_as_missing=True)[source]

Retrieve a value from the wrapped params dict, with optional type checking, casting, and validity checks. If a parameter is found to be missing, or its value not valid, then a ParamError is raised.

In Agent Op implementations, the ParamError will be caught by the API wrapper and automatically propagated to the caller. The Operation session will be marked as “done”, with success=False.

This works best if the implementation validates all parameters before beginning any Operation activities!

Parameters:
  • key (str) – The name of the parameter to extract.

  • default (any) – The value to use if the value is not set. If this isn’t explicitly set, then a missing key causes an error to be raised (see also the treat_none_as_missing arg).

  • check (callable) – A function that will validate the argument; if the function returns False then a ParamError will be raised.

  • cast (callable) – A function to run on the value to convert it. For example cast=str.lower would help convert user argument “Voltage” to value “voltage”.

  • type (type) – Class to which the result will be compared, unless it is None. Note that if you pass type=float, int values will automatically be cast to float and accepted as valid.

  • choices (list) – Acceptable values for the parameter. This is checked after casting.

  • treat_none_as_missing (bool) – Determines whether a value of None for a parameter should be treated in the same way as if the parameter were not set at all. See notes.

Return type:

The fully processed value.

Notes

The default behavior is to treat {'param': None} as the same as {}; i.e. passing None as the value for a parameter is the same as leaving the parameter unset. In both of these cases, unless a default=... is specified, a ParamError will be raised. Note this doesn’t preclude you from setting default=None, which would effectively convert {} to {'param': None}. If you really need to block {} while allowing {'param': None} to be taken at face value, then set treat_none_as_missing=False.

The cast function, if specified, is applied before the type, choices, and check arguments are processed. If the value (or the substituted default value) is None, then any specified cast and checks will not be performed.

batch(instructions, check_for_strays=True)[source]

Supports the @params decorator … see code.

check_for_strays(ignore=[])[source]

Raise a ParamError if there were arguments passed in that have not yet been extracted with .get(). Keys passed in ignore (list) will be ignored regardless.

ocs.ocs_agent.param(key, **kwargs)[source]

Decorator for Agent operation functions to assist with checking params prior to actually trying to execute the code. Example:

class MyAgent:
  ...
  @param('voltage', type=float)
  @param('delay', default=0., type=float)
  @inlineCallbacks
  def set_voltage(self, session, params):
    ...

Note the @param decorators should be all together, and outermost (listed first). This is because the current implementation caches data in the decorated function (or generator) directly, and additional decorators will conceal that.

See ocs.ocs_agent.ParamHandler for more details. Note the signature for @param is the same as for ParamHandler.get().

ocs.ocs_client

ocs.ocs_client._get_op(op_type, name, encoded, client)[source]

Factory for generating matched operations. This will make sure op.start’s docstring is the docstring of the operation.

Parameters:
class ocs.ocs_client.OCSClient(instance_id, **kwargs)[source]

Bases: object

The simple OCS Client, facilitating task/process calls.

OCSClient makes an Agent’s tasks/processes available as class attributes, making it easy to setup a client instance and call the associated Agent’s tasks and processes.

Example

This example sets up an OCSClient object and calls a FakeDataAgent’s Task (delay_task) and process (acq):

>>> client = OCSClient('fake-data-1')
>>> client.delay_task(delay=5)
>>> client.acq.start()
instance_id

instance-id for agent to run

Type:

str

Parameters:
  • instance_id (str) – Instance id for agent to run

  • args (list or args object, optional) – Takes in the parser arguments for the client. If None, pass an empty list. If list, reads in list elements as arguments. Defaults to None.

Note

For additional **kwargs see site_config.get_control_client.

class ocs.ocs_client.OCSReply(status, msg, session)[source]

Bases: _OCSReply

Create new instance of _OCSReply(status, msg, session)

ocs.ocs_feed

class ocs.ocs_feed.Block(name, keys)[source]

Bases: object

Structure of block for a so3g IrregBlockDouble.

empty()[source]

Returns true if block is empty

clear()[source]

Empties block’s buffers

append(d)[source]

Adds a single data point to the block

extend(block)[source]

Extends the data block by an encoded block

encoded()[source]
class ocs.ocs_feed.Feed(agent, feed_name, record=False, agg_params={}, buffer_time=0, max_messages=0)[source]

Bases: object

Manages publishing to a specific feed and storing of messages.

Parameters:
  • agent (OCSAgent) – agent that is registering the feed

  • feed_name (string) – name of the feed

  • record (bool, optional) – Determines if feed should be aggregated. At the moment, each agent can have at most one aggregated feed. Defaults to False

  • agg_params (dict, optional) –

    Parameters used by the aggregator and influx publisher.

    Params:
    frame_length (float):

    Deterimes the amount of time each G3Frame should be (in seconds).

    fresh_time (float):

    Time until feed is considered stale by aggregator.

    exclude_aggregator (bool):

    If True, the HK Aggregator will not record feed to g3.

    exclude_influx (bool):

    If True, the InfluxPublisher will not write the feed to Influx.

  • buffer_time (int, optional) – Specifies time that messages should be buffered in seconds. If 0, message will be published immediately. Defaults to 0.

  • max_messages (int, optional) – Max number of messages stored. Defaults to 20.

encoded()[source]
flush_buffer()[source]

Publishes all messages in buffer and empties it.

publish_message(message, timestamp=None)[source]

Publishes message to feed. If this is an aggregatable feed (record=True), then it may be buffered. Otherwise it is dispatched immediately.

Parameters:
  • message – Data to be published (see notes about acceptable formats).

  • timestamp (float) – timestamp given to the message. Defaults to time.time()

If this feed is not intended to provide structured data for aggregation, then the format of the message is unrestricted as long as it is WAMP-serializable.

For aggregated feeds, the message should be a dict with one of the following formats:

  1. A single sample for several co-sampled channels. The structure is:

    message = {
        'block_name': Key given to the block in blocking param
        'timestamp': timestamp of data
        'data': {
             key1: datapoint1
             key2: datapoint2
         }
    }
    

    Samples recorded in this way may be buffered, if self.sample_time > 0.

  2. Multiple or more samples for several co-sampled channels. The structure is:

    message = {
        'block_name': Key given to the block in blocking param
        'timestamps': [timestamp, timestamp...]
        'data': {
             key1: [datapoint, datapoint...]
             key2: [datapoint, datapoint...]
         }
    }
    

    Note that the code distinguishes between these cases based on the presence of the key ‘timestamps’ rather than ‘timestamp’. These data can be buffered, too, if self.sample_time > 0.

static verify_message_data_type(value)[source]

Aggregated Feeds can only store certain types of data. Here we check that the type of all data contained in a message’s ‘data’ dictionary are supported types.

Parameters:

value (list, float, int, bool) – ‘data’ dictionary value published (see Feed.publish_message for details).

static verify_data_field_string(field)[source]

There are strict rules for the characters allowed in field names. This function verifies the names in a message are valid.

A valid name:

  • contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_).

  • begins with a letter, or with any number of underscores followed by a letter.

  • is at least one, but no more than 255, character(s) long.

Parameters:

field (str) – Field name string to verify.

Returns:

True if field name is valid.

Return type:

bool

Raises:

ValueError – If field name is invalid.

static enforce_field_name_rules(field_name)[source]

Enforce naming rules for field names.

A valid name:

  • contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_).

  • begins with a letter, or with any number of underscores followed by a letter.

  • is at least one, but no more than 255, character(s) long.

Parameters:

field_name (str) – Field name string to check and modify if needed.

Returns:

New field name, meeting all above rules. Note this isn’t

guarenteed to not collide with other field names passed through this method, and that should be checked.

Return type:

str

ocs.ocs_twisted

class ocs.ocs_twisted.TimeoutLock(default_timeout=0)[source]

Bases: object

Locking mechanism to be used by OCS Agents.

Parameters:

default_timeout (float, optional) – Sets the default timeout value for acquire calls. Defaults to 0.

acquire(timeout=None, job=None)[source]

Acquires main lock.

Parameters:
  • timeout (float, optional) –

    Sets the timeout for lock acquisition.

    If set to 0 the acquire calls will be non-blocking and will immediately return the result.

    If set to any value greater than 0, this will block until the lock is acquired or the timeout (in seconds) has been reached.

    If set to -1 this call will block indefinitely until the lock has been acquired.

    If not set (default), it will use the TimeoutLock’s default_value (which itself defaults to 0).

  • job (string, optional) – Job name to be associated with current lock acquisition. The current job is stored in self.job so that it can be inspected by other threads.

Returns:

Whether or not lock acquisition was successful.

Return type:

result (bool)

release()[source]

Releases an acquired lock.

release_and_acquire(timeout=None)[source]

Releases and immediately reacquires a lock. Because this uses a two-lock system, it is guaranteed that at least one blocking acquire call will be able to take the active lock before this function is able to re-acquire it. However no other ordering is guaranteed.

acquire_timeout(timeout=None, job='unnamed')[source]

Context manager to acquire and hold a lock.

Parameters:
  • timeout (float, optional) – Sets the timeout for lock acquisition. See the acquire method documentation for details.

  • job (string, optional) – Job name to be associated with current lock acquisition.

Returns:

Whether or not lock acquisition was successful.

Return type:

result (bool)

The following example will attempt to acquire the lock with a timeout of three seconds:

lock = TimeoutLock()

with lock.acquire_timeout(timeout=3.0, job='acq') as acquired:
    if not acquired:
        print(f"Lock could not be acquired because it is held by {lock.job}")
        return False

    print("Lock acquired!")
ocs.ocs_twisted.in_reactor_context()[source]

Determine whether the current threading context is the twisted main (reactor) thread, or a worker pool thread. Returns True if it’s the main thread. Will raise RuntimeError if the thread name is confusing.

class ocs.ocs_twisted.Pacemaker(sample_freq, quantize=False)[source]

Bases: object

The Pacemaker is a class to help Agents maintain a regular sampling rate in their processes. The Pacemaker class will correct for the time spent in the body of the process loop in it’s sleep function. Additionally, if run with the quantize options, the pacemaker will attempt to snap samples to a temporal grid (starting on the second) so that different agents can remain relatively synchronized.

Parameters:
  • sample_freq (float) – The sampling frequency for the pacemaker to enforce. This can be a float, however in order to use the quantize option it must be a whole number.

  • quantize (bool) – If True, the pacemaker will snap to a grid starting on the second. For instance, if sample_freq is 4 and quantize is set to True, the pacemaker will make it so samples will land close to int(second) + (0, 0.25, 0.5, 0.75).

Here is an example of how the Pacemaker can be used keep a 3 Hz quantized sample rate:

pm = Pacemaker(3, quantize=True)
take_data = True:
while take_data:
    pm.sleep()
    print("Acquiring thermometry data...")
    time.sleep(np.random.uniform(0, .3))
sleep()[source]

Sleeps until the next calculated sampling time.

dsleep()[source]

Sleeps in a non-blocking way by returning the deferred created by twisted’s sleep method.

ocs.site_config

class ocs.site_config.SiteConfig[source]

Bases: object

classmethod from_dict(data)[source]
Parameters:

data – The configuration dictionary.

The configuration dictionary should have the following elements:

hub (required)

Describes what WAMP server and realm Agents and Clients should use.

hosts (required)

A dictionary of HostConfig descriptions. The keys in this dictionary can be real host names on the network, pseudo-host names, or the special value “localhost”.

A HostConfig marked for “localhost” will match any host that does not have an exact match in the hosts dictionary. This should normally be used only in single-host test systems or examples.

Client programs will normally (i.e., by default) try to load the HostConfig associated with the system hostname (that which is returned by socket.gethostname()). But this can be overridden easily, for example by using the --site-host command line argument. It is thus quite reasonable to use the hosts dictionary to hold a set of useful configurations indexed by a user-specified string (a pseudo-host).

classmethod from_yaml(filename)[source]
class ocs.site_config.HostConfig(name=None)[source]

Bases: object

classmethod from_dict(data, parent=None, name=None)[source]
Parameters:
  • data – The configuration dictionary.

  • parent – the SiteConfig from which this data was extracted (this is stored as self.parent, but not used).

The configuration dictionary should have the following elements:

agent-instances (required)

A list of AgentConfig descriptions.

agent-paths (optional)

A list of additional paths where OCS is permitted to search for Agent plugin modules.

crossbar (optional)

Settings to assist with starting / stopping / monitoring a crossbar server running on this host. There is a single crossbar server for an OCS network and thus this entry should be defined for at most one of the hosts in the site config file. Note that setting this to None (or null) will disable host crossbar control, while setting it to an empty dictionary, {}, will enable local host crossbar control with default settings.

log-dir (optional)

Path at which to write log files. Relative paths will be interpreted relative to the “working directory”; see –working-dir command line option.

class ocs.site_config.CrossbarConfig[source]

Bases: object

classmethod from_dict(data, parent=None)[source]
Parameters:
  • data – The configuration dictionary, or None.

  • parent – the HostConfig from which this data was extracted (this is stored as self.parent, but not used).

The configuration dictionary should have the following elements:

config-dir (optional): Location of crossbar config.json;

this gets passed to --cbdir, if specified..

bin (optional): The path to the crossbar executable.

This defaults to shutil.which(‘crossbar’).

If data is None, returns None. Otherwise returns a CrossbarConfig object.

get_cmd(cmd)[source]
summary()[source]
class ocs.site_config.HubConfig[source]

Bases: object

classmethod from_dict(data, parent=None)[source]
Parameters:
  • data – The configuration dictionary.

  • parent – the SiteConfig from which this data was extracted (this is stored as self.parent, but not used).

The configuration dictionary should have the following elements:

wamp_server (required): URL to the WAMP router’s websocket

access point for ocs. E.g., ws://host-2:8001/ws. WAMP routers can have multiple access points, with different protocols, security layers, and permissions. (Command line override: --site-hub.)

wamp_http (optional): URL to the WAMP router’s http bridge

interface. This is the best interface for simple clients to use. E.g., http://host-2:8001/call.

wamp_realm (required): The WAMP realm to use. WAMP

clients operating in a particular realm are isolated from clients connected to other realms. Example and test code will often use debug_realm here. (Command line override: --site-realm.)

address_root (required): The base address to be used by

all OCS Agents. This is normally something simple like observatory or detlab. (Command line override: --address-root.)

summary()[source]
class ocs.site_config.InstanceConfig[source]

Bases: object

classmethod from_dict(data, parent=None)[source]
Parameters:
  • data – The configuration dictionary.

  • parent – the HostConfig from which this data was extracted (this is stored as self.parent, but not used).

The configuration dictionary should have the following elements:

instance-id (str, required)

This string is used to set the Agent instance’s base address. This may also be matched against the instance-id provided by the Agent instance, as a way of finding the right InstanceConfig.

agent-class (str, optional)

Name of the Agent class. This may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig.

arguments (list, optional):

A list of arguments that should be passed back to the agent. Historically the arguments have been grouped into into key value pairs, e.g. [[’–key1’, ‘value’], [’–key2’, ‘value’]] but these days whatever you passed in gets flattened to a single list (i.e. that is equivalent to [’–key1’, ‘value’, ‘–key2’, ‘value’].

manage (str, optional):

A string describing how a HostManager should manage this agent. See notes.

Notes

The manage value is only relevant if a HostManager is configured to operate on the host. In that case, the HostManager’s treatment of the agent instance depends on the value of manage:

  • “ignore”: HostManager will not attempt to manage the agent instance.

  • “host/up”: HostManager will manage the agent instance, launching it on the host system. On startup, the instance will be set to target_state “up” (i.e. the HostManager will try to start it).

  • “host/down”: like host/up, but HostManager will not start up the agent instance until explicitly requested to do.

  • “docker/up”: HostManager will manage the agent instance through Docker. On Startup, the instance will be set to target_state “up”.

  • “docker/down”: Like docker/up, but the instance will be forced to target_state “down” on startup.

In earlier versions of OCS, the acceptable values were “yes”, “no”, and “docker”. Those were equivalent to current values of “host/down”, “ignore”, and “docker/down”.

Those values are still accepted, but note that “yes” and “docker” are now equivalent to “host/up” and “docker/up”.

The following abbreviated values are also accepted:

  • “host”: same as “host/up”

  • “up”: same as “host/up”

  • “down”: same as “host/down”

ocs.site_config.summarize_dict(d)[source]
class ocs.site_config.ArgContainer(args)[source]

Bases: object

A container to store a list of args as a dictionary, with the argument names (beginning with a hyphen) as keys, and list of arguments as values. Any arguments passed before an argument key is put under the ‘__positional__’ key, even though positional arguments aren’t really supported by ocs agents or the site-config….

Parameters:

args (list) – Argument list (each item should be a single word)

arg_dict

Dictionary of arguments, indexed by argument keyword.

Type:

dict

update(arg_container2)[source]

Updates the arg_dict with the arg_dict from another ArgContainer

Parameters:

arg_container2 (ArgContainer) – The other ArgContainer with which you want to update the arg_dict.

to_list()[source]

Returns the argument list representation of this container.

ocs.site_config.add_arguments(parser=None)[source]

Add OCS site_config options to an ArgumentParser.

Parameters:

parser – an ArgumentParser. If this is None, a new parser is created.

Returns:

The ArgumentParser that was passed in, or the new one.

Arguments include the --site-* family. See code or online documentation for details.

ocs.site_config.get_config(args, agent_class=None)[source]
Parameters:
  • args – The argument object returned by ArgumentParser.parse_args(), or equivalent. It is assumed that all properties defined by “add_arguments” are present in this object.

  • agent_class – Class name passed in to match against the list of device classes in each host’s list.

Special values accepted for agent_class: - ‘control’: do not insist on matching host or device. - ‘host’: do not insist on matching device (but do match host).

Returns:

The tuple (site_config, host_config, device_config).

ocs.site_config.add_site_attributes(args, site, host=None)[source]

Adds site and host attributes to namespace if they do not exist.

Parameters:
  • args – namespace to add attributes to.

  • site – Site config object.

  • host – Host config object.

ocs.site_config.reparse_args(args, agent_class=None)[source]

THIS FUNCTION IS NOW DEPRECATED… Use the parse_args function instead to parse command line and site-config args simultaneously.

Process the site-config arguments, and modify them in place according to the agent-instance’s computed instance-id.

Parameters:
  • args – The argument object returned by ArgumentParser.parse_args(), or equivalent.

  • agent_class – Class name passed in to match against the list of device classes in each host’s list.

Special values accepted for agent_class: - ‘control’: do not insist on matching host or device.

Deprecated since version v0.6.0: Use site_config.parse_args instead

ocs.site_config.get_control_client(instance_id, site=None, args=None, start=True, client_type='http')[source]

Instantiate and return a client_http.ControlClient, targeting the specified instance_id.

Parameters:
  • site (SiteConfig) – All configuration will be taken from this object, if it is not None.

  • args – Arguments from which to derive the site configuration. If this is None, then the arguments from the command line are parsed through the usual site_config system. If this is a list of strings, then these arguments will be parsed instead of sys.argv[1:]. Note that to use the default configuration (without looking at sys.argv), pass args=[]. It is also permitted to pass a pre-parsed argparse.Namespace object (or similar).

  • start (bool) – Determines whether to call .start() on the client before returning it.

  • client_type (str) – Select the client type, currently only ‘http’. wamp_http address must be known. Note that ‘wampy’ used to be a supported type, but was dropped in OCS v0.8.0.

Returns a ControlClient.

ocs.site_config.register_agent_class(class_name, filename)[source]

Register an Agent script in the site_config registry.

Parameters:
  • class_name (str) – The Agent class name, e.g. “HostManager”.

  • filename (str) – The full path to the script that launches an instance of this Agent class.

ocs.site_config.scan_for_agents(do_registration=True)[source]

Identify and import ocs Agent plugin scripts. This will find all modules in the current module search path (sys.path) that begin with the name ‘ocs_plugin_’.

Parameters:

do_registration (bool) – If True, the modules are imported, which likely causes them to call register_agent_class on each agent they represent.

Returns:

The list of discovered module names.

ocs.site_config.parse_args(agent_class=None, parser=None, args=None)[source]

Function to parse site-config and agent arguments. This function takes site, host, and instance arguments into account by making sure the instance arguments get passed through the arg_parse parser. This helps make sure units and options are consistent with those defined by the argparse argument, even when the arguments come from the site-config file and not the command line.

Parameters:
  • agent_class (str, optional) – Name of the Agent class. This may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig.

  • parser (argparse.ArgumentParser, optional) – Argument parser containing agent-specific arguments. If None, an empty parser will be created.

  • args (list of str) – Arguments to parse; defaults to sys.argv[1:].

Returns:

An argparse.Namespace, as you would get from parser.parse_args().