API
This page contains the auto-generated documentation for the ocs package.
agents
The agents/ directory contains the OCS Agents, as well as any supporting
code.
agents.access_director
- class ocs.agents.access_director.agent.AccessDirector(agent, config_file)[source]
Bases:
objectAgent for distributing Access Control information to all Agents in a system.
- manager()[source]
Process - Update the main access control feed with new access information. This occurs in response to agent queries, or if new grants require updates to access.
The
session.dataencodes information about any active exclusive grants. The entry forgrantscontains a list of :class:<AccessGrant> entries. It looks like this:'grants': [ { "name": "fake-subsystem", "expire_at": 1771356419.8087718, "grantee": "test-grant.py", "rules": [ { "hashed_pass": { "hash": "md5", "value": "03ba0072e0d32376" }, "cred_level": 2, "scope_spec": { "default": false, "agent_class": "FakeDataAgent", "instance_id": null }, "lockout_id": "fake-subsystem", "lockout_owner": "test-grant.py", "lockout_levels": [ 2, 3 ] }, { "hashed_pass": { "hash": "md5", "value": "03ba0072e0d32376" }, "cred_level": 1, "scope_spec": { "default": true, "agent_class": null, "instance_id": null }, "lockout_id": "fake-subsystem", "lockout_owner": "test-grant.py", "lockout_levels": [ 3 ] } ] } ]
- reload_config()[source]
Task - Reload access config file.
- agent_poll(instance_id, agent_class)[source]
Special access point. This is used for agents to request an announcement of their password rules on the control feed. The instance_id and agent_class arguments must both be specified (and not None). Returns True if arguments are sufficiently valid and request was slated; otherwise False.
- request_exclusive(grant_name=None, password=None, action=None, expire_at=None, grantee=None, strict=None)[source]
Special access point. Request, renew, or release an exclusive access grant.
- Parameters:
grant_name (str) – Name of the grant, to match an entry in the “grant-blocks” section of the config file.
password (str) – The password, to be checked against the password specified in the grant block of the config.
action (str) – One of “acquire”, “renew” or “release”.
expire_at (float) – Unix timestamp for the desired expiry time of the grant.
grantee (str) – A string representing the client that has requested the lock. When passed with “acquire”, it is stored for distribution to clients so they can explain who has locked them out.
strict (bool) – If True, reject all requests except acquire when a grant is inactive, renew when a grant is active, and release when a grant is active.
- Returns:
A dict with useful info.
On error, the dict has only an entry “error” with an error message in it.
On success, the returned dict has at least the items ‘grant_name’ (which matches the requested grant_name) and ‘message’; the ‘message’ is just “grant acquired” / “grant renewed” / “grant released”. It also has the ‘step’ at which this event took effect in the director.
Additionally, if the ‘action’ is ‘acquire’ or ‘renew’ then the dict will include an entry ‘expire_at’ with the unix timestamp that the grant will be cancelled automatically. This timestamp may be earlier (but not later) than the time requested with the expire_at parameter.
If the action is ‘acquire’, then the dict also has an entry ‘password’, containing the password client should use to access the exclusive access targets for the duration of the access grant.
- class ocs.agents.access_director.agent.AccessGrant(name, rules, expire_at, grantee)[source]
Bases:
object- renew(expire_at)[source]
- encode()[source]
- ocs.agents.access_director.agent.make_parser(parser=None)[source]
- ocs.agents.access_director.agent.main(args=None)[source]
agents.aggregator
- class ocs.agents.aggregator.agent.AggregatorAgent(agent, args)[source]
Bases:
objectThis class provide a WAMP wrapper for the data aggregator. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.
- Parameters:
agent (OCSAgent) – OCS Agent object
args (namespace) – args from the function’s argparser.
- time_per_file
Time (sec) before files should be rotated.
- Type:
int
- data_dir
Path to the base directory where data should be written.
- Type:
path
- aggregate
Specifies if the agent is currently aggregating data.
- Type:
bool
- incoming_data
Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Aggregator.
- Type:
queue.Queue
- loop_time
Time between iterations of the run loop.
- Type:
float
- record(test_mode=False)[source]
Process - This process will create an Aggregator instance, which will collect and write provider data to disk as long as this process is running.
- Parameters:
test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.
Notes
The most recent file and active providers will be returned in the session data:
>>> response.session['data'] {"current_file": "/data/16020/1602089117.g3", "providers": { "observatory.fake-data1.feeds.false_temperatures": { "last_refresh": 1602089118.8225083, "sessid": "1602088928.8294137", "stale": false, "last_block_received": "temps"}, "observatory.LSSIM.feeds.temperatures": { "last_refresh": 1602089118.8223345, "sessid": "1602088932.335811", "stale": false, "last_block_received": "temps"}}}
- ocs.agents.aggregator.agent.make_parser(parser=None)[source]
- ocs.agents.aggregator.agent.main(args=None)[source]
- ocs.agents.aggregator.drivers.g3_cast(data, time=False)[source]
- Casts a generic datatype into a corresponding G3 type. With:
int -> G3Int str -> G3String float -> G3Double bool -> G3Bool
and lists of type X will go to G3VectorX. If
timeis set to True, will convert to G3Time or G3VectorTime with the assumption thatdataconsists of unix timestamps.- Parameters:
data (int, str, float, or list) – Generic data to be converted to a corresponding G3Type.
time (bool, optional) – If True, will assume data contains unix timestamps and try to cast to G3Time or G3VectorTime.
- Returns:
Corresponding G3 datatype.
- Return type:
g3_data
- ocs.agents.aggregator.drivers.generate_id(hksess)[source]
Generates a unique session id based on the start_time, process_id, and hksess description.
- Parameters:
hksess (so3g.HKSessionHelper)
- ocs.agents.aggregator.drivers.make_filename(base_dir, make_subdirs=True)[source]
Creates a new filename based on the time and base_dir. If make_subdirs is True, all subdirectories will be automatically created. I don’t think there’s any reason that this shouldn’t be true…
- Parameters:
base_dir (path) – Base path where data should be written.
make_subdirs (bool) – True if func should automatically create non-existing subdirs.
- class ocs.agents.aggregator.drivers.Provider(address, sessid, prov_id, frame_length=300, fresh_time=180)[source]
Bases:
objectStores data for a single provider (OCS Feed). This class should only be accessed via a single thread.
- Parameters:
addresss (string) – Full address of the provider
sessid (string) – session_id of the provider
prov_id (bool) – id assigned to the provider by the HKSessionHelper
frame_length (float, optional) – Time (in seconds) before data should be written into a frame. Defaults to 5 min.
fresh_time (float, optional) – Time (in seconds) before provider should be considered stale. Defaults to 3 min.
- blocks
All blocks that are written by provider.
- Type:
dict
- frame_start_time
Start time of current frame
- Type:
float
- fresh_time
time (in seconds) that the provider can go without data before being labeled stale, and scheduled to be removed
- Type:
float
- last_refresh
Time when the provider was last refreshed (either through data or agent heartbeat).
- Type:
time
- last_block_received
String of the last block_name received.
- Type:
str
- log
txaio logger
- Type:
txaio.Logger
- save_to_block(data)[source]
Saves a list of data points into blocks. A block will be created for any new block_name.
Examples
The format of data is shown in the following example:
>>> data = {'test': {'block_name': 'test', 'timestamps': [time.time()], 'data': {'key1': [1], 'key2': [2]}, } } >>> prov.save_to_block(data)
Note the block name shows up twice, once as the dict key in the outer data dictionary, and again under the ‘block_name’ value. These must match – in this instance both the word ‘test’.
- Parameters:
data (dict) – data dictionary from incoming data queue
- class ocs.agents.aggregator.drivers.G3FileRotator(*args: Any, **kwargs: Any)[source]
Bases:
G3ModuleG3 module which handles file rotation. After time_per_file has elapsed, the rotator will end that file and create a new file with the filename function. It will write the last_session and last_status frame to any new file if they exist.
This class should only be accessed via a single thread.
- Parameters:
time_per_file (float) – time (seconds) before a new file should be written
filename (callable) – function that generates new filenames.
- filename
Function to call to create new filename on rotation
- Type:
function
- file_start_time
Start time for current file
- Type:
int
- writer
G3Writer object for current file. None if no file is open.
- Type:
core.G3Writer
- last_session
Last session frame written to disk. This is stored and written as the first frame on file rotation.
- Type:
core.G3Frame
- last_status
Last status frame written to disk. Stored and written as the second frame on file rotation.
- Type:
core.G3Frame
- current_file
Path to the current file being written.
- Type:
str, optional
- class ocs.agents.aggregator.drivers.Aggregator(incoming_data, time_per_file, data_dir, session=None)[source]
Bases:
objectData aggregator. This manages a collection of providers, and contains methods to write them to disk.
This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.
- Parameters:
incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.
time_per_file (float) – Time (sec) before a new file should be written to disk.
data_dir (path) – Base directory for new files to be written.
session (OpSession, optional) – Session object of current agent process. If not specified, session data will not be written.
- log
txaio logger
- Type:
txaio.Logger
- hksess
HKSession helper that assigns provider id’s to providers, and constructs so3g frames.
- Type:
so3g.HKSessionHelper
- writer
Module to use to write frames to disk.
- Type:
G3Module
- providers
dictionary of active providers, indexed by the hksess’s assigned provider id.
- Type:
Dict[Provider]
- pids
Dictionary of provider id’s assigned by the hksession. Indexed by (prov address, session_id).
- Type:
Dict[Int]
- write_status
If true, a status frame will be written next time providers are written to disk. This is set to True whenever a provider is added or removed.
- Type:
bool
- process_incoming_data()[source]
Takes all data from the incoming_data queue, and puts them into provider blocks.
- add_provider(prov_address, prov_sessid, **prov_kwargs)[source]
Registers a new provider and writes a status frame.
- Parameters:
prov_address (str) – full address of provider
prov_sessid (str) – session id of provider
- Optional Arguments:
Additional kwargs are passed directly to the Provider constructor, so defaults are set there.
- remove_provider(prov)[source]
Writes remaining provider data to frame and removes provider.
- Parameters:
prov (Provider) – provider object that should be removed.
- remove_stale_providers()[source]
Loops through all providers and check if they’ve gone stale. If they have, write their remaining data to disk (they shouldn’t have any) and delete them.
- write_to_disk(clear=True, write_all=False)[source]
Loop through all providers, and write their data to the frame_queue if they have surpassed their frame_time, or if write_all is True.
- Parameters:
clear (bool) – If True, provider data is cleared after write
write_all (bool) – If true all providers are written to disk regardless of whether frame_time has passed.
agents.barebones
- class ocs.agents.barebones.agent.BarebonesAgent(agent)[source]
Bases:
objectBarebone Agent demonstrating writing an Agent from scratch.
This Agent is meant to be an example for Agent development, and provides a clean starting point when developing a new Agent.
- Parameters:
agent (OCSAgent) – OCSAgent object from
ocs.ocs_agent.init_site_agent().
- agent
OCSAgent object from
ocs.ocs_agent.init_site_agent().- Type:
- log
Logger object used to log events within the Agent.
- Type:
txaio.tx.Logger
- lock
TimeoutLock object used to prevent simultaneous commands being sent to hardware.
- Type:
- _count
Internal tracking of whether the Agent should be counting or not. This is used to exit the Process loop by changing it to False via the count.stop() command. Your Agent won’t use this exact attribute, but might have a similar one.
- Type:
bool
- count(test_mode=False)[source]
Process - Count up from 0.
The count will restart if the process is stopped and restarted.
Notes
The most recent value is stored in the session data object in the format:
>>> response.session['data'] {"value": 0, "timestamp":1600448753.9288929}
- print(text='hello world')[source]
Task - Print some text passed to a Task.
- Parameters:
text (str) – Text to print out. Defaults to ‘hello world’.
Notes
The session data will be updated with the text:
>>> response.session['data'] {'text': 'hello world', 'last_updated': 1660249321.8729222}
- ocs.agents.barebones.agent.add_agent_args(parser_in=None)[source]
- ocs.agents.barebones.agent.main(args=None)[source]
agents.fake_data
- class ocs.agents.fake_data.agent.FakeDataAgent(agent, num_channels=2, sample_rate=10.0, frame_length=60)[source]
Bases:
object- try_set_job(job_name)[source]
- set_job_done()[source]
- acq(test_mode=False, degradation_period=None)[source]
Process - Acquire data and write to the feed.
- Parameters:
test_mode (bool, optional) – Run the acq Process loop only once. This is meant only for testing. Default is False.
degradation_period (float, optional) – If set, then alternately mark self as degraded / not degraded with this period (in seconds).
Notes
The most recent fake values are stored in the session data object in the format:
>>> response.session['data'] {"fields": {"channel_00": 0.10250430068515494, "channel_01": 0.08550903376216404, "channel_02": 0.10481891991693446, "channel_03": 0.10793263271024509}, "timestamp":1600448753.9288929}
The channels kept in fields are the ‘faked’ data, in a similar structure to the Lakeshore agents. ‘timestamp’ is the last time these values were updated.
- count_seconds(session, params)[source]
- set_heartbeat(heartbeat=True)[source]
Task - Set the state of the agent heartbeat.
- Parameters:
heartbeat (bool, optional) – True for on (the default), False for off
- delay_task(delay=5, succeed=True)[source]
Task (abortable) - Sleep (delay) for the requested number of seconds.
This can run simultaneously with the acq Process. This Task should run in the reactor thread.
- Parameters:
delay (float, optional) – Time to wait before returning, in seconds. Defaults to 5.
succeed (bool, optional) – Whether to return success or not. Defaults to True.
Notes
The session data will be updated with the requested delay as well as the time elapsed so far, for example:
>>> response.session['data'] {'requested_delay': 5., 'delay_so_far': 1.2}
- ocs.agents.fake_data.agent.add_agent_args(parser_in=None)[source]
- ocs.agents.fake_data.agent.main(args=None)[source]
agents.host_manager
- class ocs.agents.host_manager.agent.HostManager(agent, docker_composes=[], docker_service_prefix='ocs-')[source]
Bases:
objectThis Agent is used to start and stop OCS-relevant services on a particular host. If the HostManager is launched automatically when a system boots, it can then be used to start up the rest of OCS on that host (either automatically or on request).
- manager(requests=[], reload_config=True)[source]
Process - The “manager” Process maintains a list of child Agents for which it is responsible. In response to requests from a client, the Process will launch or terminate child Agents.
- Parameters:
requests (list) – List of agent instance target state requests; e.g. [(‘instance1’, ‘down’)]. See description in
update()Task.reload_config (bool) – When starting up, discard any cached database of tracked agents and rescan the Site Config File. This is mostly for debugging.
Notes
If an Agent process exits unexpectedly, it will be relaunched within a few seconds.
When this Process is started (or restarted), the list of tracked agents and their status is completely reset, and the Site Config File is read in.
Once this process is running, the target states for managed Agents can be manipulated through the
update()task.Note that when a stop is requested on this Process, all managed Agents will be moved to the “down” state and an attempt will be made to terminate them before the Process exits.
The session.data is a dict, with entries: - ‘child_states’ - ‘config_parse_status’ - indicates how recently the various
input files havebeen parsed.
‘orphans’ - lists any orphaned (in the sense of docker compose) containers.
‘new_tags’ - dict mapping instance_id to new docker image tag, if that tag is not known to docker system. Only populated for tracked instances where the tag is not known.
The ‘child_states’ entry is a list of managed Agent status; for example:
[ {'next_action': 'up', 'target_state': 'up', 'stability': 1.0, 'agent_class': 'Lakeshore372Agent', 'instance_id': 'thermo1'}, {'next_action': 'down', 'target_state': 'down', 'stability': 1.0, 'agent_class': 'ACUAgent', 'instance_id': 'acu-1'}, {'next_action': 'up', 'target_state': 'up', 'stability': 1.0, 'agent_class': 'FakeDataAgent[d]', 'instance_id': 'faker6'}, ]
If you are looking for the “current state”, it’s called “next_action” here.
The agent_class may include a suffix [d] or [d?], indicating that the agent is configured to run within a docker container. (The question mark indicates that the HostManager cannot actually identify the docker-compose service associated with the agent description in the SCF.)
The ‘config_parse_status’ is a dict where the key is a docker compose filename, or “[SCF]” for the site config file, and the value is a tuple (success, timestamp, message).
The ‘orphans’ entry is as a dict mapping docker container ID to some information about the container. E.g.:
{ "30027f37e0ef4b...": { "compose_file": "/home/ocs/config/docker-compose.yml", "service": "ocs-faker3", "container_id": "30027f37e0ef4b...", "running": true, "exit_code": 0, "container_found": true, "running_image": "sha256:7eaa6d6f6..." } }
The ‘new_tags’ entry looks like this:
{ "faker1": "simonsobs/ocs:v0.11.3", "faker2": "simonsobs/ocs:v0.11.3" }
- update(requests=[], reload_config=False)[source]
Task - Update the target state for any subset of the managed agent instances. Optionally, trigger a full reload of the Site Config File first.
- Parameters:
requests (list) – Default is []. Each entry must be a tuple of the form
(instance_id, target_state). Theinstance_idmust be a string that matches an item in the current list of tracked agent instances, or be the string ‘all’, which will match all items being tracked. Thetarget_statemust be ‘up’ or ‘down’.reload_config (bool) – Default is False. If True, the site config file and docker-compose files are reparsed in order to (re-)populate the database of child Agent instances.
Examples
update(requests=[('thermo1', 'down')]) update(requests=[('all', 'up')]) update(reload_config=True)
Notes
Starting and stopping agent instances is handled by the
manager()Process; if that Process is not running then no action is taken by this Task and it will exit with an error.The entries in the
requestslist are processed in order. For example, if the requests were [(‘all’, ‘up’), (‘data1’, ‘down’)]. This would result in setting all known children to have target_state “up”, except for “data1” which would be given target state of “down”.If
reload_configis True, the Site Config File will be reloaded (as described in_reload_config()) before any of the requests are processed.Managed docker-compose.yaml files are reparsed, continously, by the manager process – no specific action is taken with those in this Task. Note that adding/changing the list of docker-compose.yaml files requires restarting the agent.
- remove_orphans(stop_time=10.)[source]
Task - Use docker stop and docker rm to remove orphaned containers associated with managed docker compose files.
This does not really do any error checking.
- docker_pull()[source]
Task - Use docker compose to pull any (new) images for the managed docker compose files.
- die(disown_dockers=False)[source]
Task - trigger a shutdown of the manage process and then stop the reactor, causing the HostManager to exit.
- Parameters:
disown_dockers (bool) – If True, then all tracked docker services will be put in “passive tracking” mode, meaning that they will not be stopped and removed during this shutdown process. This can be used to restart HostManager without needing to also restart all (docker-based) agents on the system.
- ocs.agents.host_manager.agent.make_parser(parser=None)[source]
- ocs.agents.host_manager.agent.main(args=None)[source]
- class ocs.agents.host_manager.drivers.ManagedInstance(management: str, agent_class: str, instance_id: str, full_name: str, operable: bool = False, retired: bool = False, passive_tracking: bool = False, agent_script: str | None = None, prot: object | None = None, restart_required: bool = False, target_state: str = 'down', next_action: str = 'down', at: float = 0, fail_times: ~typing.List = <factory>)[source]
Bases:
objectTracks the properties of a managed Agent-instance, including how to launch it, the current run state, target state, etc.
- management: str
How host is managed; either “host”, “docker”, or “retired”.
- agent_class: str
Agent class name (which may include suffix “[d]” or “[d?]” for docker-managed instances; or simply “[docker]” for services that do not seem to be registered in the SCF.
- instance_id: str
The agent instance’s instance_id, or else the docker service name associated with entry in the SCF.
- full_name: str
instance_id.
- Type:
Indentier constructed as agent_class
- operable: bool = False
Indicates whether the instance can be manipulated (whether calls to up/down should be expected to work).
- retired: bool = False
Indicates if instance is retired and can be removed from tracking.
- passive_tracking: bool = False
Indicates if instance should be “passively” managed, e.g. not be enforced other than ephemerally to attempt a start / stop. This is expected to only be used for docker-based instances.
- agent_script: str = None
The docker service name, if docker-managed; otherwisre the string
__plugin__to indicate it is host managed.
- prot: object = None
The Twisted ProcessProtocol object, if host system managed; or else the DockerContainerHelper if docker-based.
- restart_required: bool = False
Indicates a restart is in order, due to change of docker tag or other new software version.
- target_state: str = 'down'
The run state HostManager is trying to enforce (up, down, passive).
- next_action: str = 'down'
The thing HostManager plans to do next; this will sometimes mirror the current state (up or down) and will sometimes carry a transitional state, such as “wait_start”.
- at: float = 0
Unix timestamp, used by transitional states to indicate time at which some subsequent action should be taken.
- fail_times: List
List of unix timestamps for recent events where an instance stopped unexpectedly; used to identify “unstable” agents.
- property is_running
- property exit_code
- ocs.agents.host_manager.drivers.resolve_child_state(minst)[source]
- Parameters:
minst (ManagedInstance) – the instance state information. This will be modified in place.
- Returns:
‘messages’ (list of str): messages for the session.
’launch’ (bool): whether to launch a new instance.
’terminate’ (bool): whether to terminate the instance.
’sleep’ (float): maximum delay before checking back, or None if this machine doesn’t care.
- Return type:
Dict with important actions for caller to take. Content is
- ocs.agents.host_manager.drivers.stability_factor(times, window=120)[source]
Given an increasing list of failure times, quantify the stability of the activity.
A single failure, 10 seconds in the past, has a stability factor of 0.5; if there were additional failures before that, the stability factor will be lower.
Returns a culled list of stop times and a stability factor (0 - 1).
- class ocs.agents.host_manager.drivers.AgentProcessHelper(instance_id, cmd)[source]
Bases:
ProcessProtocol- up()[source]
- down()[source]
- property is_running
- connectionMade()[source]
Called when a connection is made.
This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here.
- inConnectionLost()[source]
This will be called when stdin is closed.
- processExited(status)[source]
This will be called when the subprocess exits.
@type reason: L{twisted.python.failure.Failure}
- outReceived(data)[source]
Some data was received from stdout.
- errReceived(data)[source]
Some data was received from stderr.
- class ocs.agents.host_manager.drivers.DockerContainerHelper(service, docker_bin=None)[source]
Bases:
objectClass for managing the docker container associated with some service. Provides some of the same interface as AgentProcessHelper. Pass in a service description dict (such as the ones returned by parse_docker_state).
- update(service)[source]
Update self.status based on service info (in format returned by parse_docker_state).
- up()[source]
- down()[source]
- ocs.agents.host_manager.drivers.parse_docker_state(docker_compose_file)[source]
Analyze a docker compose.yaml file to get a list of services. Using docker compose ps and docker inspect, determine whether each service is running or not.
- Returns:
- A dict where the key is the service name and each value is a
dict with the following entries:
’compose_file’: the path to the docker compose file
’service’: service name
’image_tag’: the tag listed for the image in the compose file (this may differ from the running image).
’image_id’: the docker image ID corresponding to ‘image_tag’; will be “unknown” if, e.g., listed tag is not yet pulled to the running system.
’container_found’: bool, indicates whether a container for this service was found (whether or not it was running).
’container_id’: the docker ID of the container (if found).
’running’: bool, indicating that the found container is in state “Running”.
’running_image’: the ID of the image for the container (if found; e.g. “sha:0f…”).
’exit_code’: int, which is either extracted from the docker inspect output or is set to 127. (This should never be None.)
- orphans:
A dict (by container id) of dicts describing running containers that are associated with this compose file but have apparently been removed from the service list. Key is the service name.
- Return type:
services
agents.influxdb_publisher
- class ocs.agents.influxdb_publisher.agent.InfluxDBAgent(agent, args)[source]
Bases:
objectThis class provide a WAMP wrapper for the data publisher. The run function and the data handler are thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent.
- Parameters:
agent (OCSAgent) – OCS Agent object
args (namespace) – args from the function’s argparser.
- data_dir
Path to the base directory where data should be written.
- Type:
path
- aggregate
Specifies if the agent is currently aggregating data.
- Type:
bool
- incoming_data
Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher.
- Type:
queue.Queue
- loop_time
Time between iterations of the run loop.
- Type:
float
- record()[source]
Process - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running.
- Parameters:
test_mode (bool, optional) – Run the record Process loop only once. This is meant only for testing. Default is False.
Notes
An example of the session data:
>>> response.session['data'] {'connected': True, 'last_updated': 1774389203.53926}
- ocs.agents.influxdb_publisher.agent.make_parser(parser=None)[source]
- ocs.agents.influxdb_publisher.agent.main(args=None)[source]
- class ocs.agents.influxdb_publisher.drivers.Publisher(host, database, incoming_data, port=8086, protocol='line', ssl=False, verify_ssl=False, gzip=False, operate_callback=None)[source]
Bases:
objectData publisher. This manages data to be published to the InfluxDB.
This class should only be accessed by a single thread. Data can be passed to it by appending it to the referenced incoming_data queue.
- Parameters:
incoming_data (queue.Queue) – A thread-safe queue of (data, feed) pairs.
host (str) – host for InfluxDB instance.
database (str) – database name within InfluxDB to publish to
port (int, optional) – port for InfluxDB instance, defaults to 8086.
protocol (str, optional) – Protocol for writing data. Either ‘line’ or ‘json’.
ssl (bool, optional) – Use https instead of http to connect to InfluxDB, defaults to False.
verify_ssl (bool, optional) – Verify SSL certificates for HTTPS requests, defaults to False.
gzip (bool, optional) – compress influxdb requsts with gzip
operate_callback (callable, optional) – Function to call to see if failed connections should be retried (to prevent a thread from locking).
- db
database name within InfluxDB to publish to (from database arg)
- Type:
str
- protocol
Protocol for writing data. Either ‘line’ or ‘json’.
- Type:
str, optional
- incoming_data
data to be published
- client_args
arguments passed to InfluxDB client
- client
InfluxDB client connection
- connected
True if connected to InfluxDB, False if not.
- Type:
bool
- process_incoming_data()[source]
Takes all data from the incoming_data queue, and writes them to the InfluxDB.
agents.registry
- class ocs.agents.registry.agent.RegisteredAgent(feed)[source]
Bases:
objectContains data about registered agents.
- Parameters:
feed (dict) – Encoded
ocs.ocs_feed.Feed.
- expired
True if agent has not been updated in Registry.agent_timeout seconds.
- Type:
bool
- time_expired
ctime at which the agent expired. This will be None if the agent is not expired.
- Type:
float, optional
- last_updated
ctime at which the agent was last updated
- Type:
float
- op_codes
Dictionary of operation codes for each of the agent’s operations. For details on what the operation codes mean, see docs from the
ocs_agentmodule- Type:
dict
- refresh(op_codes=None)[source]
- expire()[source]
- encoded()[source]
- class ocs.agents.registry.agent.Registry(agent, args)[source]
Bases:
objectThe Registry agent is in charge of keeping track of which agents are currently running. It has a single process “main” that loops and keeps track of when agents expire. This agent subscribes to all heartbeat feeds, so no additional function calls are required to register an agent.
A list of agent statuses is maintained in the “main” process’s session.data object.
- Parameters:
agent (OCSAgent) – the ocs agent object
- registered_agents
A defaultdict of RegisteredAgent objects, which contain whether the agent has expired, the time_expired, and the last_updated time.
- Type:
defaultdict
- agent_timeout
The time an agent has between heartbeats before being marked as expired.
- Type:
float
- main(test_mode=False)[source]
Process - Main run process for the Registry agent. This will loop and keep track of which agents have expired. It will keep track of current active agents in the session.data variable so it can be seen by clients.
- Parameters:
test_mode (bool, optional) – Run the main Process loop only once. This is meant only for testing. Default is False.
Notes
The session data object for this process will be a dictionary containing the encoded RegisteredAgent objects for each agent observed during the lifetime of the Registry. For instance, this might look like:
>>> response.session['data'] { "observatory.aggregator": { "expired": False, "time_expired": None, "last_updated": 1669925713.4082503, "op_codes": { "record": 3 }, "agent_class": "AggregatorAgent", "agent_address": "observatory.aggregator" }, "observatory.fake-hk-agent-01": { "expired": False, "time_expired": None, "last_updated": 1669925945.7575383, "op_codes": { "acq": 3, "set_heartbeat": 1, "delay_task": 1 }, "agent_class": "FakeDataAgent", "agent_address": "observatory.fake-hk-agent-01" } }
- ocs.agents.registry.agent.make_parser(parser=None)[source]
- ocs.agents.registry.agent.main(args=None)[source]
ocs.agent_cli
- ocs.agent_cli.build_agent_list()[source]
Builds a list of all Agents available across all ocs plugins installed on the system.
Note
Currently if two plugins provide the same Agent the one loaded first is used. This should be improved somehow if we expect overlapping Agents to be provided by plugins.
Examples
An example agent list:
>>> build_agent_list() {'RegistryAgent': {'module': 'ocs.agents.registry.agent', 'entry_point': 'main'}, 'AggregatorAgent': {'module': 'ocs.agents.aggregator.agent', 'entry_point': 'main'}, 'HostManager': {'module': 'ocs.agents.host_manager.agent', 'entry_point': 'main'}, 'FakeDataAgent': {'module': 'ocs.agents.fake_data.agent', 'entry_point': 'main'}, 'InfluxDBAgent': {'module': 'ocs.agents.influxdb_publisher.agent', 'entry_point': 'main'}, 'BarebonesAgent': {'module': 'ocs.agents.barebones.agent', 'entry_point': 'main'}}
- Returns:
Dictionary of available agents, with agent names as the keys, and dicts containing the module and entry_point as values.
- Return type:
dict
ocs.access
Access Control is a system for restricting access to some Agents or Agent functions to certain clients.
Functions in this module are prefixed with one of:
agent_– functions used by an Agent to determine access controls and verify client access credentials.client_– functions used by client programs to interact with Agents that implement access control.director_– functions used by Access Director to process access rules and grant requests.
- ocs.access.AC_VERSION = 1
The protocol version (“access_control”) that agents report themselves as using via get_api.
- class ocs.access.CredLevel(value)[source]
Bases:
IntEnumRepresentation of the credential level of a client, or the required credential level for some operation.
- BLOCKED = 0
- BASIC = 1
- ADVANCED = 2
- FULL = 3
- SUPERUSER = 4
- ocs.access.CRED_KEYS = [('password_1', CredLevel.BASIC), ('password_2', CredLevel.ADVANCED), ('password_3', CredLevel.FULL), ('password_4', CredLevel.SUPERUSER)]
Keys to use for passwords associated with each CredLevel. These are used in config blocks for Access Director and OCS clients, for example.
- class ocs.access.HashedPass(hash: str, value: str = '')[source]
Bases:
objectContainer for hashed password values, for internal storage in Access Director agent as well as for distribution to Agents that need to validate clients.
- hash: str
The identifier for the hash being used (use “none”) for clear text, and “blocked” to refuse to match any user password.
- value: str = ''
The hashed string. If this string is empty, and hash != “blocked”, then any provided password will match.
- class ocs.access.AccessPasswordItem(default: bool | None = False, agent_class: str | None = None, instance_id: str | None = None, password_1: str | HashedPass | None = None, password_2: str | HashedPass | None = None, password_3: str | HashedPass | None = None, password_4: str | HashedPass | None = None)[source]
Bases:
objectA single entry in a password configuration file.
This is used both in the Access Configuration File, which configures the Access Director, and in the OCS Password File, which clients reference to find passwords to use for specific agents.
Each field in this object is either a selector or a password.
The selectors are used to determine whether this rule applies to a particular agent instance (described in terms of its agent_class and instance_id). Those keys are:
default(bool): True if this selector should apply to all agent instances – when this is set, all other selector keys are ignored.agent_class(str): This is a pattern to match against the provided agent_class. If absent or set to None, this is ignored.instance_id(str): This is a pattern to match against the provided instance_id. If absent or set to None, this is ignored.
Patterns and matching are as described in this module’s
pattern_match().The passwords consist of:
password_1: The password associated with credential level 1.password_2: The password … level 2.password_3: The password … level 3.password_4: The password … level 4.
When used in the Access Configuration File, the password entries define passwords that should be accepted, to grant clients that level of access. The passwords may be strings (which will be interpreted as either clear-text, or pre-hashed values, depending on the
passwords_block_default_hashfuncsetting). When the Agent is checking a password, it considers all the rules and returns the highest credential level matched by the password and target.When used in the OCS Password File, only
password_2and/orpassword_3keys should be used – and the values should be strings, which will be interpreted as the clear text passwords to use in the Client.- default: bool | None = False
Selector variables
- agent_class: str | None = None
- instance_id: str | None = None
- password_1: str | HashedPass | None = None
Password variables
- password_2: str | HashedPass | None = None
- password_3: str | HashedPass | None = None
- password_4: str | HashedPass | None = None
- class ocs.access.AgentSpec(instance_id: str, agent_class: str, superuser_key: object | None = None)[source]
Bases:
objectIdentifying information for a specific Agent Instance. Info here is compared against a ScopeSpec for an AccessRule, to see if rule should be applied to the Instance.
- instance_id: str
- agent_class: str
- superuser_key: object | None = None
The superuser_key is used for an Agent to start its own operations, internally, such as on startup.
- class ocs.access.ActionContext(op_name: str | None = None, action: str | None = None)[source]
Bases:
objectPlaceholder class for future fine-grain access control / lockout of individual operations at the Access Director level.
- op_name: str | None = None
- action: str | None = None
- ocs.access.pattern_match(target: str, pattern: str, raw=False)[source]
Pattern-matching of a target against a pattern is defined as follows:
A pattern consists of one or more sub-patterns, separated by commas. The pattern matches the target if any of the positive sub-patterns match the target, as long as none of the negative sub-patterns matches the target.
If a sub-pattern does not start with “!”, it is considered a positive sub-pattern and fnmatch is used to test the sub-pattern against the target.
If a sub-pattern starts with “!”, then it is a negative sub-pattern and the remainder of the sub-pattern text is used with fnmatch to test against the target.
Examples
The pattern
"Director,Act*,*Producer*"matches the string “Director” or any string that starts with “Act” or contains “Producer”.The pattern
"*Agent,!FakeDataAgent"matches any string that ends with “Agent”, except the string “FakeDataAgent”.The pattern
"compute*,login*,!*9,!*8"matches any string that starts with “compute” or “login” and does not end with “8” or “9”.
- class ocs.access.ScopeSpec(default: bool | None = False, agent_class: str | None = None, instance_id: str | None = None)[source]
Bases:
objectA specification of the scope of an AccessRule. See
checkfunction for matching details.- default: bool | None = False
- agent_class: str | None = None
- instance_id: str | None = None
- check(agent: AgentSpec)[source]
Determine whether
agentmatches the present ScopeSpec.If
self.defaultis True, then this function returns True. Otherwise, ifself.agent_classandself.instance_idare both None, then this function returns False. Otherwise the agent must match the pattern in self.agent_class, and the pattern in self.instance_id.See
pattern_match()in this module for a description of patterns and matching.
- get_specificity()[source]
Return the specificity of this rule, for sorting. The principles are that positive patterns are more specific than negative patterns; and then that instance_id selection is more specific than agent_class selection.
If default=True, then 0 is returned. If not default, but agent_class and instance_id are both None, then 0 is returned.
Otherwise, the specificity is the sum of the following contributions:
8 if instance_id includes any positive subpatterns.
4 if agent_class includes any positive subpatterns.
2 if instance_id includes any negative subpatterns.
1 if agent_class includes any negative subpatterns.
- class ocs.access.AccessRule(hashed_pass: ~ocs.access.HashedPass | None, cred_level: ~ocs.access.CredLevel | None, scope_spec: ~ocs.access.ScopeSpec = <factory>, lockout_id: str | None = None, lockout_owner: str | None = None, lockout_levels: ~typing.List[~ocs.access.CredLevel] | None = <factory>)[source]
Bases:
objectA Rule for consumption by an Agent to grant or revoke access. The
scope_specdetermines what Agent Instances the rule should be applied to. The cred_level is the level granted by this rule, if password in hashed_pass has been provided.The lockout_* entries are populated in the case that this rule arises from an Exclusive Access Grant. The lockout_id is the name given to some specific lockout definition. The lockout_owner is some identifier provided by whoever requested the lockout. And lockout_levels is the list of CredLevels that other callers (those without the present lockout password) are forbidden from having.
- hashed_pass: HashedPass | None
- lockout_id: str | None = None
- lockout_owner: str | None = None
- class ocs.access.AgentAccessRules(policy: str | None = None, director_id: str | None = None, agent: ~ocs.access.AgentSpec | None = None, rules: ~typing.List[~ocs.access.AccessRule] = <factory>)[source]
Bases:
objectA container used by Agents to hold configuration, including AccessRule (whether generated in place or received from Access Director agent).
- policy: str | None = None
- director_id: str | None = None
- rules: List[AccessRule]
- ocs.access.agent_get_policy_default(policy: str) [<class 'ocs.access.AgentAccessRules'>][source]
Get the default access rules, based on a “policy” string (probably from the –access-policy Agent argument).
The policy passed in by the user should be one of the following:
“none” (or “”, or None). This effectively disables Access Control, as the returned rules will give a caller maximum privileges, no matter what password.
“override:pw2,pw3”, where “pw2” and “pw3” represent the desired level 2 and level 3 passwords (unhashed).
“director:access-dir”, where “access-dir” represents the instance-id of an Access Director agent. The rules returned in this case will, initially, block all access; it’s expected these rules will be updated by messages from the Access Director.
- ocs.access.agent_filter_rules(rules: List[AccessRule], agent: AgentSpec) [List[ocs.access.AccessRule]][source]
Filter a list of AccessRules, keeping only ones pertinent to agent.
- ocs.access.agent_get_creds(password: str | None, access_rules: AgentAccessRules, agent: AgentSpec, action: ActionContext | None = None) [<enum 'CredLevel'>][source]
Based on the access_rules, and the provided password, determine the credential level for the specified agent and action.
- ocs.access.agent_rejection_message(cred_level: CredLevel, required_level: CredLevel, lockout_detail: str = '')[source]
Get a helpful message about what privs are needed to access a resource protected at required_level.
- ocs.access.client_get_password(privs, agent_class, instance_id)[source]
For OCSClient use – determine the best client password to use. This may lead to inspection of OCS password files.
- Parameters:
privs (str, int) – If this is a string, it will be used as the password. If this argument is an integer, it represents a desired credential level and the local password configuration will be inspected to find a password associated with access at that level.
agent_class (str or None) – If specified, will be used to match rules in the password config file.
instance_id (str or None) – If specified, will be used to match rules in the password config file.
- Returns:
A string representing the password to use in all requests the client makes (these are passed to the agent in the “password=…” argument of the _ops_handler).
Notes
If privs is a string, then this password is used directly and no inspection of config files is performend.
If privs is 1, then the password ‘’ is returned.
If privs is 2 or 3, then the OCS password file is loaded and inspected for a suitable password. The password file will be loaded from
~/.ocs-passwords.yaml, unless overridden by the environment variableOCS_PASSWORDS_FILE.The password file is in yaml format, containing a single list. Each entry in the list is a dictionary referred to as a rule. Each rule is a dict with the schema described in
ocs.access.AccessPasswordItem.To find a suitable password, all rules are considered. When multiple rules have selectors that match the target and contain a password of the required credential level or higher, then the one that is most specific is taken. When multiple rules are tied for specificity, the one occurring latest in the list is taken. “Specificity” is outlined in
ScopeSpec.get_specificity().Here’s an example passwords file:
- default: true password_3: "general-access-password" - agent_class: "FakeDataAgent" instance_id: "!faker4,!faker1" password_2: "normal-faker-password" - instance_id: "faker*" password_2: "special-faker-password" - instance_id: "faker4" password_2: "special-faker4-password"
Suppose faker1 and faker4 both have agent_class “FakeDataAgent”. For level 2 access, “faker1” matches the rules at index 0 and 2, but the most specific rule is 2 so that is used. For “faker4”, rules 0, 2, and 3 match. Rules 2 and 3 are equally specific but rule 3 occurs later so it is used.
- class ocs.access.ExclusiveAccessClient(target: str, grantee: str, grant_name: str, password: str | None = None)[source]
Bases:
objectManager class for Exclusive Access grants.
- Parameters:
target – instance_id of the Access Director agent (or an OCSClient to use).
grantee – Identifier for the requester (for consumer information).
grant_name – The grant name, as defined in the grant config block.
password – The password for the grant config block, if needed.
The acquire, renew and release methods all return a tuple with (ok, detail), where ok is a boolean indicating that things seem to have worked, and detail is the full “useful info” result returned by the call to
AccessDirector.request_exclusive. See that function for details.- acquire(expire_at: float | None = None) Tuple[bool, dict][source]
Try to acquire the exclusive access grant.
expire_atis an optional unix timestamp to suggest as the grant expiry time.If this succeeds, the access password, and timestamp at which the grant will expire, are stored in
self.passwordandself.expire_at.
- class ocs.access.GrantConfigItem(cred_level: ~ocs.access.CredLevel | None, default: bool | None = False, agent_class: str | None = None, instance_id: str | None = None, lockout_levels: ~typing.List[~ocs.access.CredLevel] | None = <factory>)[source]
Bases:
objectA single entry from Access Director config file.
- default: bool | None = False
- agent_class: str | None = None
- instance_id: str | None = None
- class ocs.access.DistributedAccessGrant(name: str, grants: List[GrantConfigItem], password: str | None = None, hash: str | None = 'md5')[source]
Bases:
objectClass for decoding a “grant” block of Access Director config file.
- name: str
- grants: List[GrantConfigItem]
- password: str | None = None
- hash: str | None = 'md5'
- class ocs.access.AccessDirectorConfig(passwords_block_default_hashfunc: str = 'none', distrib_hashfunc: str = 'md5', passwords: ~typing.List[~ocs.access.AccessPasswordItem] = <factory>, exclusive_access_blocks: ~typing.List[~ocs.access.DistributedAccessGrant] = <factory>)[source]
Bases:
object- passwords_block_default_hashfunc: str = 'none'
- distrib_hashfunc: str = 'md5'
- passwords: List[AccessPasswordItem]
- exclusive_access_blocks: List[DistributedAccessGrant]
- ocs.access.director_parse_config(config: dict) dict[source]
Parse a config file and return the config dict. This includes some validation and some translation.
The elements at the top level of the returned dict will become attributes of the AD instance, so be careful what you add in there.
- ocs.access.director_get_access_rules(grant_def: DistributedAccessGrant, grant_owner: str) Tuple[str, List[AccessRule]][source]
Generate password for a grant block, and the corresponding access rule blocks to be passed to agents for processing credentials.
- Returns:
- password
The password granting access.
- rules
The list of AccessRule objects for distribution to agents.
ocs.base
- class ocs.base.ResponseCode(value)[source]
Bases:
EnumEnumeration of response codes from the Operation API (start, stop, wait, …).
These response codes indicate only whether the API call was successful, in that the request was propagated all the way to the Agent’s Operation code. They are not used to represent success or failure of the Operation itself.
- OK = 0
OK indicates the request was successful.
- ERROR = -1
ERROR indicates that the request could not be propagated fully. This may occur, for example, if an invalid Operation name is passed, if a request is made that conflicts with an Operation’s current state (e.g. .start() is called on an already-running Operation), if an API call is made to a Operation of an incompatible type (e.g. .stop() on a Task), or due to API syntax error (e.g. misspelled keyword argument to .wait()).
- TIMEOUT = 1
TIMEOUT is returned in the case that a Client issued a blocking call with timeout (.wait()), and the timeout expired before the Operation completed.
- class ocs.base.OpCode(value)[source]
Bases:
EnumEnumeration of OpSession “op_code” values.
The op_code corresponds to the session.status, with the following extensions:
If the session.status == “done” then the op_code will be assigned a value of either SUCCEEDED or FAILED based on session.success.
If the session.status == “running”, and session.degraded is True, then the op_code will be DEGRADED rather than RUNNING.
- NONE = 1
NONE is used to represent an uninitialized OpSession, and does not correspond to some attempt to run the Operation.
- STARTING = 2
STARTING indicates that start() has been successfully called, but the Operation has not yet marked itself as successfully launched. If this state is reached, then at the very least the start request was not rejected because it was already running.
- RUNNING = 3
RUNNING indicates that the Operation has performed its basic initialization and parameter checking and is performing its task. Operation codes need to explicitly mark themselves as running by calling session.set_state(‘running’).
- STOPPING = 4
STOPPING indicates that the Agent has received a stop or abort request for this Operation and will try to wrap things up ASAP.
- SUCCEEDED = 5
SUCCEEDED indicates that the Operation has terminated and has indicated the Operation was successful. This includes the case of a Process that was asked to stop and has shut down cleanly.
- FAILED = 6
FAILED indicates that the Operation has terminated with some kind of error.
- EXPIRED = 7
EXPIRED may used to mark session information as invalid in cases where the state cannot be determined.
- DEGRADED = 8
DEGRADED indicates that an operation meets the requirements for state RUNNING, but is self-reporting as being in a problematic state where it is unable to perform its primary functions (for example, if a Process to operate some hardware is trying to re-establish connection to that hardware).
ocs.client_http
- class ocs.client_http.ControlClient(agent_addr, **kwargs)[source]
Bases:
object- get_api(simple=False)[source]
Query the API and other info from the Agent; this includes lists of Processes, Tasks, and Feeds, docstrings, operation session structures, and info about the Agent instance (class, PID, host).
- Parameters:
simple (bool) – If True, then return just the lists of the op and feed names without accompanying detail.
- Returns:
A dict, see
ocs.ocs_agent.OCSAgent._management_handler()for detail.
- get_tasks()[source]
Query the list of Tasks from the Agent management interface.
Returns a list of items of the form (task_name, info_dict).
- get_processes()[source]
Query the list of Processes from the Agent management interface.
Returns a list of items of the form (process_name, info_dict).
- get_feeds()[source]
Query the list of Feeds from the Agent management interface.
Returns a list of items of the form (feed_name, info_dict).
ocs.client_t
- ocs.client_t.run_control_script(function, parser=None, *args, **kwargs)[source]
Run a function in a ControlClientSession, within a site_config paradigm, assuming that additional configuration will be passed through command line arguments.
- Parameters:
function – The function to call.
parser – argparse.ArgumentParser, with control script options pre-loaded. If None, this will be created internally in the usual way.
The command line is parsed and site configuration information is loaded. The WAMP server information is used to configure the Control Client’s connection before launching the function. The function is invoked as:
function(app, parser_args, *args, **kwargs)
where app is the ControlClientSession and parser_args is the argparse.Namespace object including processing done by ocs.site_config. Note that the observatory root_address is contained in parser_args.root_address. Any additional arguments defined in the parser will also be present.
This function can be invoked with parser=None, or else with a parser that has been initialized for control client purposes, like this:
from ocs import client_t, site_config parser = site_control.add_arguments() # initialized ArgParser parser.add_option('--target') # Options for this client client_t.run_control_script(my_script, parser=parser)
In the my_script function, use parser_args.target to get the target.
- class ocs.client_t.ControlClientSession(config, script, script_args, script_kwargs)[source]
Bases:
ApplicationSessionImplements
autobahn.wamp.interfaces.ISession()
- class ocs.client_t.OperationClient(app, root_address, op_name)[source]
Bases:
objectThe endpoint client is associated with a single operation rather than with an operation server.
- class ocs.client_t.TaskClient(app, root_address, op_name)[source]
Bases:
OperationClient
- class ocs.client_t.ProcessClient(app, root_address, op_name)[source]
Bases:
OperationClient
ocs.common
The common/ directory contains driver code shared among agents.
ocs.common.influxdb_drivers
- ocs.common.influxdb_drivers.timestamp2influxtime(time, protocol)[source]
Convert timestamp for influx, always in UTC.
- Parameters:
time – Unix ‘ctime’ timestamp, i.e.
1775500953.5108523protocol – InfluxDB protocol to format timestamp for. Either ‘json’ or line’.
- class ocs.common.influxdb_drivers.InfluxTags(shared_tags: dict, field_tags: dict | None = None)[source]
Bases:
objectStores tags to apply to a set of data within an InfluxBlock.
Examples
>>> tags = InfluxTags(shared_tags={'feed': 'example_fed'}, ... field_tags={'key1': 1, '_field': 'value'})
Tags to apply to all data points.
- field_tags: dict = None
Tags to apply per field, along with ‘_field’ value to use.
- class ocs.common.influxdb_drivers.InfluxBlock(block_name: str, data: dict, timestamps: list, measurement: str, tags: InfluxTags)[source]
Bases:
objectHolds and can convert the data and feed information into a format suitable for publishing to InfluxDB.
- block_name: str
OCS Block name.
- data: dict
OCS data, as it comes across the feed.
- timestamps: list
Corresponding timestamps for the data.
- measurement: str
Measurement name to publish to in InfluxDB.
- tags: InfluxTags
Tags to apply to the measurements.
- encode(protocol='line')[source]
Encode Block data into InfluxDB compatible format for the given protocol.
- Parameters:
protocol (str) – Protocol to use to publish to InfluxDB. Either ‘line’ or ‘json’. Defaults to ‘line’.
- Returns:
List of encoded data points, formatted to be used by the InfluxDB client.
- Return type:
list
- ocs.common.influxdb_drivers.format_data(data, feed, protocol)[source]
Format the data from an OCS feed for publishing to InfluxDB.
The scheme used depends on whether ‘influxdb_tags’ are published to the Feed.
Without ‘influxdb_tags’ the measurement consists of the agent address, i.e.
address_root.instance_id, there is a single tag for the feed name, and each data field from the OCS feed is used directly as the field name in InfluxDB. This structure, however, is not ideal for InfluxDB query performance.When ‘influxdb_tags’ are provided by the agent then the measurement becomes the agent class, and the address root and instance-id are added as tags. The ‘influxdb_tags’ are also used to add additional tags and provide a simple field name. See the examples below.
- Parameters:
data (dict) – Data from the OCS Feed subscription.
feed (dict) – Feed from the OCS Feed subscription, contains feed information used to structure our influxdb query.
protocol (str) – Protocol for writing data. Either ‘line’ or ‘json’.
- Returns:
Data ready to publish to influxdb, in the specified protocol.
- Return type:
list
Examples
>>> # without 'influxdb_tags' >>> format_data(data, feed, protocol='line') ['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088', 'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088', 'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088', 'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088']
>>> # with 'influxdb_tags' >>> format_data(data, feed, protocol='line') ['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088', 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088', 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088', 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088']
ocs.ocs_agent
- ocs.ocs_agent.init_site_agent(args, address=None)[source]
Create ApplicationSession and ApplicationRunner instances, set up to communicate on the chosen WAMP realm.
- Parameters:
args (argparse.Namespace) – The arguments, as processed by ocs.site_config.
Returns: (agent, runner).
- class ocs.ocs_agent.OCSAgent(config, site_args, address=None, class_name=None)[source]
Bases:
ApplicationSessionOCSAgent is used to connect blocking device control code to the OCS. OCSAgent is an ApplicationSession and its methods are all run in the twisted main Reactor thread.
To make use of OCSAgent, the user instantiates it (perhaps through init_ocs_agent) with a particular agent_address. Then the user registers task and process functions by calling register_task() and register_process(). These are (blocking) functions that will be called, in their own twisted thread, when the “start” method is request.
The OCSAgent automatically registers handlers in WAMP, namely:
- {agent_address} - the management_handler function, which
responds to queries about what tasks and processes are exposed by this agent.
- {agent_address}.ops - the device_handler function, which accepts
Operation commands (start, status, etc.) for all Tasks and Processes.
The OCSAgent also makes use of pubsub channels:
- {agent_address}.feed - a channel to which any session status
updates are published (written by the Agent; subscribed by any interested Control Tools).
Implements
autobahn.wamp.interfaces.ISession()- encoded()[source]
Returns a dict describing this Agent. Includes ‘agent_address’, and lists of ‘feeds’, ‘tasks’, and ‘processes’.
- _gather_sessions(parent)[source]
Gather the session data for self.tasks or self.sessions, for return through the management_handler.
- Parameters:
parent – either self.tasks or self.processes.
- Returns:
A list of Operation description tuples, one per registered Task or Process. Each tuple consists of elements (name, session, op_info):
name: The name of the operation.
session: dict with OpSession.encode(() info for the active or most recent session. If no such session exists the result will have member ‘status’ set to ‘no_history’.
op_info: information registered about the operation, such as op_type, docstring and blocking.
- _management_handler(q, **kwargs)[source]
Get a description of this Agent’s API. This is for adaptive clients (such as OCSClient) to construct their interfaces.
- Parameters:
q (string) – One of ‘get_api’, ‘get_tasks’, ‘get_processes’, ‘get_feeds’, ‘get_agent_class’.
- Returns:
api_description – If the argument is ‘get_api’, then a dict with the following entries is returned:
’agent_class’: The class name of this agent.
’instance_hostname’: The host name where the Agent is running, as returned by socket.gethostname().
’instance_pid’: The PID of the Agent interpreter session, as returned by os.getpid().
’feeds’: The list of encoded feed information, tuples (feed_name, feed_info).
’processes’: The list of Process api description info, as returned by
_gather_sessions().’tasks’: The list of Task api description info, as returned by
_gather_sessions().’access_control’: if present, contains some basic info about the access control version, agent configuration, and update count. If this isn’t present (i.e. in older ocs), passing “password” argument to API calls will likely produce an error.
Passing get_X will, for some values of X, return only that subset of the full API; treat that as deprecated.
- Return type:
dict
- register_task(name, func, aborter=None, blocking=True, aborter_blocking=None, startup=False, min_privs=0)[source]
Register a Task for this agent.
- Parameters:
name (string) – The name of the Task.
func (callable) – The function that will be called to handle the “start” operation of the Task.
aborter (callable) – The function that will be called to handle the “abort” operation of the Task (optional).
blocking (bool) – Indicates that
funcshould be launched in a worker thread, rather than running in the main reactor thread.aborter_blocking (bool or None) – Indicates that
abortershould be run in a worker thread, rather than running in the main reactor thread. Defaults to value ofblocking.startup (bool or dict) – Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation does not auto-start. Otherwise, the Operation is launched on startup. If the
startupargument is a dictionary, this is passed to the Operation’s start function.min_privs (int) – Minimum privilege level required to start or abort this operation (1, 2, 3). See Access Control.
Notes
The functions func and aborter will be called with arguments (session, params) where session is the active OpSession and params is passed from the client.
(Passing params to the aborter might not be supported in the client library so don’t count on that being useful.)
- register_process(name, start_func, stop_func, blocking=True, stopper_blocking=None, startup=False, min_privs=0)[source]
Register a Process for this agent.
- Parameters:
name (string) – The name of the Process.
start_func (callable) – The function that will be called to handle the “start” operation of the Process.
stop_func (callable) – The function that will be called to handle the “stop” operation of the Process.
blocking (bool) – Indicates that
start_funcshould be launched in a worker thread, rather than running in the reactor.stopper_blocking (bool or None) – Indicates that
stop_funcshould be launched in a worker thread, rather than running in the reactor. Defaults to the value ofblocking.startup (bool or dict) – Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation does not auto-start. Otherwise, the Operation is launched on startup. If the
startupargument is a dictionary, this is passed to the Operation’s start function.min_privs (int) – Minimum privilege level required to start or stop this operation (1, 2, 3). See Access Control.
Notes
The functions start_func and stop_func will be called with arguments (session, params) where session is the active OpSession and params is passed from the client.
(Passing params to the stop_func might not be supported in the client library so don’t count on that being useful.)
- call_op(agent_address, op_name, action, params=None, timeout=None)[source]
Calls ocs_agent operation.
- Parameters:
agent_address (string) – Address of the agent who registered operation
op_name (string) – Name of the operation
action (string) – Action of operation. start, stop , wait, etc.
params (dict) – Params passed to operation
timeout (float) – timeout for operation
- register_feed(feed_name, **kwargs)[source]
Initializes a new feed with name
feed_name.- Parameters:
feed_name (string) – name of the feed
record (bool, optional) – Determines if feed should be aggregated. At the moment, each agent can have at most one aggregated feed. Defaults to False
agg_params (dict, optional) – Parameters used by the aggregator and influx publisher. See the
ocs.ocs_feed.Feeddocstring for the full list of aggregator params.buffer_time (int, optional) – Specifies time that messages should be buffered in seconds. If 0, message will be published immediately. Defaults to 0.
max_messages (int, optional) – Max number of messages stored. Defaults to 20.
- Returns:
The Feed object (which is also cached in self.feeds).
- publish_to_feed(feed_name, message, from_reactor=None)[source]
Publish data to named feed.
- Parameters:
feed_name (str) – should match the name of a registered feed.
message (serializable) – data to publish. Acceptable format depends on feed configuration; see Feed.publish_message.
from_reactor (bool or None) – This is deprecated; the code will check whether you’re in a thread or not.
Notes
If an unknown feed_name is passed in, an error is printed to the log and that’s all.
If you are running a “blocking” operation, in a thread, then it is best if the message is not a persistent data structure from your thread (especially something you might modify soon after this call). The code will take a copy of your structure and pass that to the reactor thread, but the copy may not be deep enough!
- subscribe(handler, topic, options=None, force_subscribe=False)[source]
Subscribes to a topic for receiving events. Identical to ApplicationSession subscribe, but by default prevents re-subscription to the same topic multiple times unless force_subscribe=True.
For full documentation see: https://autobahn.readthedocs.io/en/latest/reference/autobahn.wamp.html#autobahn.wamp.interfaces.ISession.subscribe
- Parameters:
handler (callable) – handler called with message data
topic (string) – uri of topic to subscribe to
options (dict) – Dict of subscribe options. To set prefix or wildcard matching, set match to prefix or wildcard respectively. For more info, see https://autobahn.readthedocs.io/en/latest/reference/autobahn.wamp.html#autobahn.wamp.types.SubscribeOptions
force_subscribe (bool) – If true, force resubscribe to an already susbscribed topic. Defaults to False.
- subscribe_to_feed(agent_addr, feed_name, handler, options=None, force_subscribe=False)[source]
Constructs topic feed from agent address and feedname, and subscribes to it.
- Parameters:
agent_addr (str) – Full agent address, e.g. observatory.LS12345
feed_name (str) – Feed name, e.g. temperatures
handler (callable) – handler called with message data
options (dict) – Dict or subscribe options. See https://autobahn.readthedocs.io/en/latest/reference/autobahn.wamp.html#autobahn.wamp.types.SubscribeOptions
force_subscribe (bool) – If true, force resubscribe to an already susbscribed topic. Defaults to False.
- subscribe_on_start(handler, topic, options=None, force_subscribe=None)[source]
Schedules a topic to be subscribed to OnJoin. See OCSAgent.subscribe’s docstring.
- start(op_name, params=None, password=None)[source]
Launch an operation. Note that successful return of this function does not mean that the operation is running; it only means that the system has requested the operation to run.
Returns tuple (status, message, session).
Possible values for status:
- ocs.ERROR: the specified op_name is not known, or the op is
already running (has an active session).
ocs.OK: the Operation start routine has been launched.
- wait(op_name, timeout=None, password=None)[source]
Wait for the specified Operation to become idle, or for timeout seconds to elapse. If timeout==None, the timeout is disabled and the function will not return until the Operation terminates. If timeout<=0, then the function will return immediately.
Returns (status, message, session).
Possible values for status:
- ocs.TIMEOUT: the timeout expired before the Operation became
idle.
ocs.ERROR: the specified op_name is not known.
ocs.OK: the Operation has become idle.
- _stop_helper(stop_type, op_name, params, password)[source]
Common stopper/aborter code for Process stop and Task abort.
- Parameters:
stop_type (str) – either ‘stop’ or ‘abort’.
op_name (str) – the op_name.
params (dict or None) – Params to be passed to stopper function.
password – Password of the client.
- stop(op_name, params=None, password=None)[source]
Initiate a Process stop routine.
Returns (status, message, session).
Possible values for status:
- ocs.ERROR: the specified op_name is not known, or refers to
a Task. Also returned if Process is known but not running.
ocs.OK: the Process stop routine has been launched.
- abort(op_name, params=None, password=None)[source]
Initiate a Task abort routine.
Returns (status, message, session).
Possible values for status:
- ocs.ERROR: the specified op_name is not known, or refers to
a Process. Also returned if Task is known but not running.
ocs.OK: the Process stop routine has been launched.
- status(op_name, params=None, password=None)[source]
Get an Operation’s session data.
Returns (status, message, session). When there is no session data available, an empty dictionary is returned instead.
Possible values for status:
ocs.ERROR: the specified op_name is not known.
ocs.OK: the op_name was recognized.
- class ocs.ocs_agent.AgentTask(launcher, blocking=None, aborter=None, aborter_blocking=None, min_privs=1)[source]
Bases:
AgentOp
- class ocs.ocs_agent.AgentProcess(launcher, stopper, blocking=None, stopper_blocking=None, min_privs=0)[source]
Bases:
AgentOp
- ocs.ocs_agent.SESSION_STATUS_CODES = [None, 'starting', 'running', 'stopping', 'done']
These are the valid values for session.status. Use like this:
None: uninitialized.
starting: the Operation code has been launched and is performing basic quick checks in anticipation of moving to the (longer term) “running” state.running: the Operation code has performed basic quick checks and has started to do the requested thing.stopping: the Operation code has acknowledged receipt of a “stop” or “abort” request.done: the Operation has exited, either succesfully or not.
- class ocs.ocs_agent.OpSession(session_id, op_name, status='starting', app=None, purge_policy=None, cred_level=1)[source]
Bases:
objectWhen a caller requests that an Operation (Process or Task) is started, an OpSession object is created and is associated with that run of the Operation. The Operation codes are given access to the OpSession object, and may update the status and post messages to the message buffer. This is the preferred means for communicating Operation status to the caller.
In the OCSAgent model, Operations may run in the main, “reactor” thread or in a worker “pool” thread. Services provided by OpSession must support both these contexts (see, for example, add_message).
Control Clients are given a copy of the latest session information in each response from the Operation API. The format of that information is described in
.encoded().The message buffer is purged periodically.
- encoded()[source]
Encode the session data in a dict. This is the data structure that is returned to Control Clients using the Operation API, as the “session” information. Note the returned object is a dict with entries described below.
- Returns:
session_id (int) – A unique identifier for a single session (a single “run” of the Operation). When an Operation is initiated, a new session object is created and can be distinguished from other times the Operation has been run using this id.
op_name (str) – The OCS Operation name.
op_code (int) – The OpCode, which combines information from status, success, and degraded; see
ocs.base.OpCode.cred_level (int) – The Credential Level (see Access Control) with which the operation was started.
status (str) – The Operation run status (e.g. ‘starting’, ‘done’, …). See
ocs.ocs_agent.SESSION_STATUS_CODES.degraded (bool) – A boolean flag (defaults to False) that an operation may set to indicate that it is not achieving its primary function (e.g. if it cannot establish connection to hardware).
success (bool or None) – If the Operation Session has completed (status == ‘done’), this indicates that the Operation was deemed successful. Prior to the completion of the operation, the value is None. The value could be False if the Operation reported failure, or if it crashed and failure was marked by the encapsulating OCS code.
start_time (float) – The time the Operation Session started, as a unix timestamp.
end_time (float or None) – The time the Operation Session ended, as a unix timestamp. While the Session is still on-going, this is None.
data (dict) – This is an area for the Operation code to store custom information for Control Clients to consume. See notes below. This structure will be tested for strict JSON compliance, and certain translations performed (such as converting NaN to None/null).
messages (list) – A buffer of messages posted by the Operation. Each element of the list is a tuple, (timestamp, message) where timestamp is a unix timestamp and message is a string.
Notes
The
datafield may be used by Agent code to provide data that might be of interest to a user (whether human or automated), such as the most recent readings from a device, structured information about the configuration and progress of the Operation, or diagnostics.Please see developer documentation (session.data) for advice on structuring your Agent session data.
- property op_code
Returns the OpCode for the given session. This is what will be published to the registry’s
operation_statusfeed.
- set_status(status, timestamp=None)[source]
Update the OpSession status and possibly post a message about it.
- Parameters:
status (string) – New value for status (see below).
timestamp (float) – timestamp for the operation.
The possible values for status are:
- ‘starting’
This status object has just been created, and the Operation launch code has yet to run.
- ‘running’
The Operation is running.
- ‘stopping’
The Operation is running, but a stop or abort has been requested.
- ‘done’
The Operation is has terminated. (Failure / success must be determined separately.)
The only valid transitions are forward in the sequence [starting, running, stopping, done]; i.e. it is forbidden for the status of an OpSession to move from stopping to running.
If this function is called from a worker thread, it will be scheduled to run in the reactor, and will block until that is complete.
- add_message(message, timestamp=None)[source]
Add a log message to the OpSession messages buffer.
- Parameters:
message (string) – Message to append.
timestamp (float) – timestamp to tag the message. The default, which is None, will cause the timestamp to be computed here and should be used in most cases.
- class ocs.ocs_agent.ParamHandler(params)[source]
Bases:
objectHelper for Agent Operation codes to extract params. Supports type checking, has casting, and will raise errors that are automatically added to the session log and propagated to the caller in a useful way.
There are two ways to use this. The first and recommended way is to use the @param decorator. Example:
from ocs import ocs_agent class MyAgent: ... @ocs_agent.param('voltage', type=float) @ocs_agent.param('delay_time', default=1., type=float) @ocs_agent.param('other_action', default=None, cast=str) def my_task(self, session, params): # (Type checking and default substitution have been done already) voltage = params['voltage'] delay_time = params['delay_time'] other_action = params['other_action'] ...
When you use the @param decorator, the OCS code can check the parameters immediately when they are received from the client, and immediately return an error message to the client’s start request (without even calling the Op start function):
OCSReply: ERROR : Param 'delay'=two_seconds is not of required type (<class 'float'>) (no session -- op has never run)
A second possibility is to instantiate a ParamHandler at the start of your Op start function, and use it to extract parameters. Example:
from ocs import ocs_agent class MyAgent: ... def my_task(self, session, params): params = ocs_agent.ParamHandler(params) # Mandatory, and cannot be None. voltage = params.get('voltage', type=float) # Optional, defaults to 1. delay_time = params.get('delay_time', default=1., type=float) # Optional, interpret as string, but defaults to None. other_action = params.get('other_action', default=None, cast=str) ...
In this case, errors will not be immediatley returned to the user, but the Operation will quickly fail, and the error message will show up in the message log:
OCSReply: OK : Operation "my_task" is currently not running (FAILED). my_task[session=1]; status=done with ERROR 0.115665 s ago, took 0.000864 s messages (4 of 4): 1629464204.780 Status is now "starting". 1629464204.780 Status is now "running". 1629464204.781 ERROR: Param 'delay'=two_seconds is not of required type (<class 'float'>) 1629464204.781 Status is now "done". other keys in .session: op_code, data- get(key, default=ParamError(''), check=None, cast=None, type=None, choices=None, treat_none_as_missing=True)[source]
Retrieve a value from the wrapped params dict, with optional type checking, casting, and validity checks. If a parameter is found to be missing, or its value not valid, then a ParamError is raised.
In Agent Op implementations, the ParamError will be caught by the API wrapper and automatically propagated to the caller. The Operation session will be marked as “done”, with success=False.
This works best if the implementation validates all parameters before beginning any Operation activities!
- Parameters:
key (str) – The name of the parameter to extract.
default (any) – The value to use if the value is not set. If this isn’t explicitly set, then a missing key causes an error to be raised (see also the treat_none_as_missing arg).
check (callable) – A function that will validate the argument; if the function returns False then a ParamError will be raised.
cast (callable) – A function to run on the value to convert it. For example
cast=str.lowerwould help convert user argument “Voltage” to value “voltage”.type (type) – Class to which the result will be compared, unless it is
None. Note that if you passtype=float,intvalues will automatically be cast tofloatand accepted as valid.choices (list) – Acceptable values for the parameter. This is checked after casting.
treat_none_as_missing (bool) – Determines whether a value of
Nonefor a parameter should be treated in the same way as if the parameter were not set at all. See notes.
- Return type:
The fully processed value.
Notes
The default behavior is to treat
{'param': None}as the same as{}; i.e. passingNoneas the value for a parameter is the same as leaving the parameter unset. In both of these cases, unless adefault=...is specified, aParamErrorwill be raised. Note this doesn’t preclude you from settingdefault=None, which would effectively convert{}to{'param': None}. If you really need to block{}while allowing{'param': None}to be taken at face value, then settreat_none_as_missing=False.The cast function, if specified, is applied before the type, choices, and check arguments are processed. If the value (or the substituted default value) is
None, then any specified cast and checks will not be performed.
- ocs.ocs_agent.param(key, **kwargs)[source]
Decorator for Agent operation functions to assist with checking params prior to actually trying to execute the code. Example:
class MyAgent: ... @param('voltage', type=float) @param('delay', default=0., type=float) @inlineCallbacks def set_voltage(self, session, params): ...
Note the
@paramdecorators should be all together, and outermost (listed first). This is because the current implementation caches data in the decorated function (or generator) directly, and additional decorators will conceal that.See
ocs.ocs_agent.ParamHandlerfor more details. Note the signature for @param is the same as forParamHandler.get().
ocs.ocs_client
- ocs.ocs_client._get_op(op_type, name, encoded, client, password)[source]
Factory for generating matched operations. This will make sure op.start’s docstring is the docstring of the operation.
- Parameters:
op_type (str) – Operation type, either ‘task’ or ‘process’.
name (str) – Operation name
encoded (dict) – Encoded
ocs.ocs_agent.AgentTaskorocs.ocs_agent.AgentProcessdictionary.client (ControlClient) – Client object, which will be used to issue the requests for operation actions.
password (str) – Client-supplied password, or None to not use a password.
- class ocs.ocs_client.OCSClient(instance_id, privs=None, **kwargs)[source]
Bases:
objectThe simple OCS Client, facilitating task/process calls.
OCSClient makes an Agent’s tasks/processes available as class attributes, making it easy to setup a client instance and call the associated Agent’s tasks and processes.
Example
This example sets up an OCSClient object and calls a FakeDataAgent’s Task (delay_task) and process (acq):
>>> client = OCSClient('fake-data-1') >>> client.delay_task(delay=5) >>> client.acq.start()
- instance_id
instance-id for agent to run
- Type:
str
- Parameters:
instance_id (str) – Instance id for agent to run
privs (str or int) – If str, a password to use for all interactions with Agent operations. If int, the system will check the password configuration for a password that can yield that credential level, and apply it.
args (list or args object, optional) – Takes in the parser arguments for the client. If None, pass an empty list. If list, reads in list elements as arguments. Defaults to None.
Note
For additional
**kwargssee site_config.get_control_client.Note
For details on
privsprocessing, seeocs.access.client_get_password().
ocs.ocs_feed
- class ocs.ocs_feed.Block(name, keys)[source]
Bases:
objectStructure of block for a so3g IrregBlockDouble.
- class ocs.ocs_feed.Feed(agent, feed_name, record=False, agg_params={}, buffer_time=0, max_messages=0)[source]
Bases:
objectManages publishing to a specific feed and storing of messages.
- Parameters:
agent (OCSAgent) – agent that is registering the feed
feed_name (string) – name of the feed
record (bool, optional) – Determines if feed should be aggregated. At the moment, each agent can have at most one aggregated feed. Defaults to False
agg_params (dict, optional) –
Parameters used by the aggregator and influx publisher.
- Params:
- frame_length (float):
Deterimes the amount of time each G3Frame should be (in seconds).
- fresh_time (float):
Time until feed is considered stale by aggregator.
- exclude_aggregator (bool):
If True, the HK Aggregator will not record feed to g3.
- exclude_influx (bool):
If True, the InfluxPublisher will not write the feed to Influx.
buffer_time (int, optional) – Specifies time that messages should be buffered in seconds. If 0, message will be published immediately. Defaults to 0.
max_messages (int, optional) – Max number of messages stored. Defaults to 20.
- publish_message(message, timestamp=None)[source]
Publishes message to feed. If this is an aggregatable feed (record=True), then it may be buffered. Otherwise it is dispatched immediately.
- Parameters:
message – Data to be published (see notes about acceptable formats).
timestamp (float) – timestamp given to the message. Defaults to time.time()
If this feed is not intended to provide structured data for aggregation, then the format of the message is unrestricted as long as it is WAMP-serializable.
For aggregated feeds, the message should be a dict with one of the following formats:
A single sample for several co-sampled channels. The structure is:
message = { 'block_name': Key given to the block in blocking param 'timestamp': timestamp of data 'data': { key1: datapoint1 key2: datapoint2 } }
Samples recorded in this way may be buffered, if self.sample_time > 0.
Multiple or more samples for several co-sampled channels. The structure is:
message = { 'block_name': Key given to the block in blocking param 'timestamps': [timestamp, timestamp...] 'data': { key1: [datapoint, datapoint...] key2: [datapoint, datapoint...] } }
Note that the code distinguishes between these cases based on the presence of the key ‘timestamps’ rather than ‘timestamp’. These data can be buffered, too, if self.sample_time > 0.
- static verify_influxdb_tags(message)[source]
Check the ‘influxdb_tags’ to make sure all the needed information is provided.
This checks to make sure each tag has a ‘_field’ provided, and that each field has a corresponding tag.
Note
‘influxdb_tags’ can be
None. This will be the case for any agents that do not implement tagging.- Parameters:
message (dict) – ‘message’ dictionary value published (see Feed.publish_message for details).
- Raises:
ValueError – If the ‘influxdb_tags’ provided in the message do not meet the required format.
- static verify_message_data_type(value)[source]
Aggregated Feeds can only store certain types of data. Here we check that the type of all data contained in a message’s ‘data’ dictionary are supported types.
- Parameters:
value (list, float, int, bool) – ‘data’ dictionary value published (see Feed.publish_message for details).
- static verify_data_field_string(field)[source]
There are strict rules for the characters allowed in field names. This function verifies the names in a message are valid.
A valid name:
contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_).
begins with a letter, or with any number of underscores followed by a letter.
is at least one, but no more than 255, character(s) long.
- Parameters:
field (str) – Field name string to verify.
- Returns:
True if field name is valid.
- Return type:
bool
- Raises:
ValueError – If field name is invalid.
- static enforce_field_name_rules(field_name)[source]
Enforce naming rules for field names.
A valid name:
contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_).
begins with a letter, or with any number of underscores followed by a letter.
is at least one, but no more than 255, character(s) long.
- Parameters:
field_name (str) – Field name string to check and modify if needed.
- Returns:
- New field name, meeting all above rules. Note this isn’t
guarenteed to not collide with other field names passed through this method, and that should be checked.
- Return type:
str
ocs.ocs_twisted
- class ocs.ocs_twisted.TimeoutLock(default_timeout=0)[source]
Bases:
objectLocking mechanism to be used by OCS Agents.
- Parameters:
default_timeout (float, optional) – Sets the default timeout value for acquire calls. Defaults to 0.
- acquire(timeout=None, job=None)[source]
Acquires main lock.
- Parameters:
timeout (float, optional) –
Sets the timeout for lock acquisition.
If set to 0 the acquire calls will be non-blocking and will immediately return the result.
If set to any value greater than 0, this will block until the lock is acquired or the timeout (in seconds) has been reached.
If set to -1 this call will block indefinitely until the lock has been acquired.
If not set (default), it will use the TimeoutLock’s default_value (which itself defaults to 0).
job (string, optional) – Job name to be associated with current lock acquisition. The current job is stored in self.job so that it can be inspected by other threads.
- Returns:
Whether or not lock acquisition was successful.
- Return type:
result (bool)
- release_and_acquire(timeout=None)[source]
Releases and immediately reacquires a lock. Because this uses a two-lock system, it is guaranteed that at least one blocking acquire call will be able to take the active lock before this function is able to re-acquire it. However no other ordering is guaranteed.
- acquire_timeout(timeout=None, job='unnamed')[source]
Context manager to acquire and hold a lock.
- Parameters:
timeout (float, optional) – Sets the timeout for lock acquisition. See the
acquiremethod documentation for details.job (string, optional) – Job name to be associated with current lock acquisition.
- Returns:
Whether or not lock acquisition was successful.
- Return type:
result (bool)
The following example will attempt to acquire the lock with a timeout of three seconds:
lock = TimeoutLock() with lock.acquire_timeout(timeout=3.0, job='acq') as acquired: if not acquired: print(f"Lock could not be acquired because it is held by {lock.job}") return False print("Lock acquired!")
- ocs.ocs_twisted.in_reactor_context()[source]
Determine whether the current threading context is the twisted main (reactor) thread, or a worker pool thread. Returns True if it’s the main thread. Will raise RuntimeError if the thread name is confusing.
- class ocs.ocs_twisted.Pacemaker(sample_freq, quantize=False)[source]
Bases:
objectThe Pacemaker is a class to help Agents maintain a regular sampling rate in their processes. The Pacemaker class will correct for the time spent in the body of the process loop in it’s sleep function. Additionally, if run with the
quantizeoptions, the pacemaker will attempt to snap samples to a temporal grid (starting on the second) so that different agents can remain relatively synchronized.- Parameters:
sample_freq (float) – The sampling frequency for the pacemaker to enforce. This can be a float, however in order to use the
quantizeoption it must be a whole number.quantize (bool) – If True, the pacemaker will snap to a grid starting on the second. For instance, if
sample_freqis 4 andquantizeis set to True, the pacemaker will make it so samples will land close toint(second) + (0, 0.25, 0.5, 0.75).
Here is an example of how the Pacemaker can be used keep a 3 Hz quantized sample rate:
pm = Pacemaker(3, quantize=True) take_data = True: while take_data: pm.sleep() print("Acquiring thermometry data...") time.sleep(np.random.uniform(0, .3))
ocs.site_config
- class ocs.site_config.SiteConfig[source]
Bases:
object- classmethod from_dict(data)[source]
- Parameters:
data – The configuration dictionary.
The configuration dictionary should have the following elements:
hub(required)Describes what WAMP server and realm Agents and Clients should use.
hosts(required)A dictionary of HostConfig descriptions. The keys in this dictionary can be real host names on the network, pseudo-host names, or the special value “localhost”.
A HostConfig marked for “localhost” will match any host that does not have an exact match in the hosts dictionary. This should normally be used only in single-host test systems or examples.
Client programs will normally (i.e., by default) try to load the HostConfig associated with the system hostname (that which is returned by socket.gethostname()). But this can be overridden easily, for example by using the
--site-hostcommand line argument. It is thus quite reasonable to use the hosts dictionary to hold a set of useful configurations indexed by a user-specified string (a pseudo-host).
- class ocs.site_config.HostConfig(name=None)[source]
Bases:
object- classmethod from_dict(data, parent=None, name=None)[source]
- Parameters:
data – The configuration dictionary.
parent – the SiteConfig from which this data was extracted (this is stored as self.parent, but not used).
The configuration dictionary should have the following elements:
agent-instances(required)A list of AgentConfig descriptions.
crossbar(optional)Settings to assist with starting / stopping / monitoring a crossbar server running on this host. There is a single crossbar server for an OCS network and thus this entry should be defined for at most one of the hosts in the site config file. Note that setting this to None (or null) will disable host crossbar control, while setting it to an empty dictionary, {}, will enable local host crossbar control with default settings.
log-dir(optional)Path at which to write log files. Relative paths will be interpreted relative to the “working directory”; see –working-dir command line option.
- class ocs.site_config.CrossbarConfig[source]
Bases:
object- classmethod from_dict(data, parent=None)[source]
- Parameters:
data – The configuration dictionary, or None.
parent – the HostConfig from which this data was extracted (this is stored as self.parent, but not used).
The configuration dictionary should have the following elements:
config-dir(optional): Location of crossbar config.json;this gets passed to
--cbdir, if specified..bin(optional): The path to the crossbar executable.This defaults to shutil.which(‘crossbar’).
If data is None, returns None. Otherwise returns a CrossbarConfig object.
- class ocs.site_config.HubConfig[source]
Bases:
object- classmethod from_dict(data, parent=None)[source]
- Parameters:
data – The configuration dictionary.
parent – the SiteConfig from which this data was extracted (this is stored as self.parent, but not used).
The configuration dictionary should have the following elements:
wamp_server(required): URL to the WAMP router’s websocketaccess point for ocs. E.g.,
ws://host-2:8001/ws. WAMP routers can have multiple access points, with different protocols, security layers, and permissions. (Command line override:--site-hub.)wamp_http(optional): URL to the WAMP router’s http bridgeinterface. This is the best interface for simple clients to use. E.g.,
http://host-2:8001/call.wamp_realm(required): The WAMP realm to use. WAMPclients operating in a particular realm are isolated from clients connected to other realms. Example and test code will often use
debug_realmhere. (Command line override:--site-realm.)address_root(required): The base address to be used byall OCS Agents. This is normally something simple like
observatoryordetlab. (Command line override:--address-root.)access_policy(optional): Specify the default accesspolicy for all agents. To tell all agents to contact an Access Director Agent with instance_id
access-dir, set this to the string “director:access-dir”. (Command line override:--access-policy.)
- class ocs.site_config.InstanceConfig[source]
Bases:
object- classmethod from_dict(data, parent=None)[source]
- Parameters:
data – The configuration dictionary.
parent – the HostConfig from which this data was extracted (this is stored as self.parent, but not used).
The configuration dictionary should have the following elements:
instance-id(str, required)This string is used to set the Agent instance’s base address. This may also be matched against the instance-id provided by the Agent instance, as a way of finding the right InstanceConfig.
agent-class(str, optional)Name of the Agent class. This may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig.
arguments(list, optional):A list of arguments that should be passed back to the agent.
manage(str, optional):A string describing how a HostManager should manage this agent. See notes.
Notes
The
managevalue is only relevant if a HostManager is configured to operate on the host. In that case, the HostManager’s treatment of the agent instance depends on the value ofmanage:“ignore”: HostManager will not attempt to manage the agent instance.
“host/up”: HostManager will manage the agent instance, launching it on the host system. On startup, the instance will be set to target_state “up” (i.e. the HostManager will try to start it).
“host/down”: like host/up, but HostManager will not start up the agent instance until explicitly requested to do.
“docker/up”: HostManager will manage the agent instance through Docker. On Startup, the instance will be set to target_state “up”.
“docker/down”: Like docker/up, but the instance will be forced to target_state “down” on startup.
In earlier versions of OCS, the acceptable values were “yes”, “no”, and “docker”. Those were equivalent to current values of “host/down”, “ignore”, and “docker/down”.
Those values are still accepted, but note that “yes” and “docker” are now equivalent to “host/up” and “docker/up”.
The following abbreviated values are also accepted:
“host”: same as “host/up”
“up”: same as “host/up”
“down”: same as “host/down”
- class ocs.site_config.ArgContainer(args)[source]
Bases:
objectA container to store a list of args as a dictionary, with the argument names (beginning with a hyphen) as keys, and list of arguments as values. Any arguments passed before an argument key is put under the ‘__positional__’ key, even though positional arguments aren’t really supported by ocs agents or the site-config….
- Parameters:
args (list) – Argument list (each item should be a single word)
- arg_dict
Dictionary of arguments, indexed by argument keyword.
- Type:
dict
- update(arg_container2)[source]
Updates the arg_dict with the arg_dict from another ArgContainer
- Parameters:
arg_container2 (ArgContainer) – The other ArgContainer with which you want to update the arg_dict.
- ocs.site_config.add_arguments(parser=None)[source]
Add OCS site_config options to an ArgumentParser.
- Parameters:
parser – an ArgumentParser. If this is None, a new parser is created.
- Returns:
The ArgumentParser that was passed in, or the new one.
Arguments include the
--site-*family. See code or online documentation for details.
- ocs.site_config.get_config(args, agent_class=None)[source]
- Parameters:
args – The argument object returned by ArgumentParser.parse_args(), or equivalent. It is assumed that all properties defined by “add_arguments” are present in this object.
agent_class – Class name passed in to match against the list of device classes in each host’s list.
Special values accepted for agent_class: - ‘control’: do not insist on matching host or device. - ‘host’: do not insist on matching device (but do match host).
- Returns:
The tuple (site_config, host_config, device_config).
- ocs.site_config.add_site_attributes(args, site, host=None)[source]
Adds site and host attributes to namespace if they do not exist.
- Parameters:
args – namespace to add attributes to.
site – Site config object.
host – Host config object.
- ocs.site_config.get_control_client(instance_id, site=None, args=None, start=True, client_type='http')[source]
Instantiate and return a client_http.ControlClient, targeting the specified instance_id.
- Parameters:
site (SiteConfig) – All configuration will be taken from this object, if it is not None.
args – Arguments from which to derive the site configuration. If this is None, then the arguments from the command line are parsed through the usual site_config system. If this is a list of strings, then these arguments will be parsed instead of sys.argv[1:]. Note that to use the default configuration (without looking at sys.argv), pass args=[]. It is also permitted to pass a pre-parsed argparse.Namespace object (or similar).
start (bool) – Determines whether to call .start() on the client before returning it.
client_type (str) – Select the client type, currently only ‘http’. wamp_http address must be known. Note that ‘wampy’ used to be a supported type, but was dropped in OCS v0.8.0.
Returns a ControlClient.
- ocs.site_config.parse_args(agent_class=None, parser=None, args=None)[source]
Function to parse site-config and agent arguments. This function takes site, host, and instance arguments into account by making sure the instance arguments get passed through the arg_parse parser. This helps make sure units and options are consistent with those defined by the argparse argument, even when the arguments come from the site-config file and not the command line.
- Parameters:
agent_class (str, optional) – Name of the Agent class. This may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig.
parser (argparse.ArgumentParser, optional) – Argument parser containing agent-specific arguments. If None, an empty parser will be created.
args (list of str) – Arguments to parse; defaults to sys.argv[1:].
- Returns:
An argparse.Namespace, as you would get from parser.parse_args().