Registry Agent

The Registry Agent tracks all currently running Agents on the OCS network, providing the ability to monitor the status of each Agent’s Tasks and Processes through the Operation Monitor.

usage: agent.py [-h] [--wait-time WAIT_TIME]

Agent Options

--wait-time

Sleep time for main loop

Default: 30.0

Configuration File Examples

Below are configuration examples for the ocs config file and for running the Agent in a docker container.

OCS Site Config

An example site-config-file block:

{ 'agent-class': 'RegistryAgent',
  'instance-id': 'registry',
  'arguments': [
    ['--wait-time', 30]
  ]},

Docker Compose

An example docker compose configuration:

ocs-registry:
    image: simonsobs/ocs:latest
    container_name: ocs-registry
    hostname: ocs-docker
    user: "9000"
    environment:
      - INSTANCE_ID=registry
    volumes:
      - ${OCS_CONFIG_DIR}:/config

Description

The registry agent is used to keep track of currently running active agents. It listens to the heartbeat feeds of all agents on the crossbar server, and keeps track of the last heartbeat time of each agent and whether or not each agent has agent has “expired” (gone 5 seconds without a heartbeat).

This check happens in the registry’s single “main” process. The session.data object of this process is set to a dict of agents on the system, including their last heartbeat time, whether they have expired, the time at which they expired, and a dictionary of their operation codes. This data can the be viewed by checking the session variable of the main process.

For instance, the following code will print agent’s that have been on the system since the registry started running:

from ocs.ocs_client import OCSClient

registry_client = OCSClient('registry')
status, msg, session = registry_client.main.status()

print(session['data'])

which will print a dictionary that might look like:

{
  "observatory.aggregator": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925713.4082503,
    "op_codes": {
      "record": 3
    },
    "agent_class": "AggregatorAgent",
    "agent_address": "observatory.aggregator"
  },
  "observatory.fake-hk-agent-01": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925945.7575383,
    "op_codes": {
      "acq": 3,
      "set_heartbeat": 1,
      "delay_task": 1
    },
    "agent_class": "FakeDataAgent",
    "agent_address": "observatory.fake-hk-agent-01"
  }
}

Operation Monitor

The registry is also used to track the status of each agent’s tasks and processes. Operation codes for each operation are regularly passed through an agent’s heartbeat feed, which the registry assembles and publishes through its own OCS feed. This makes it possible to monitor individual operation states in grafana and to easily set alerts when a process stops running or when a task fails.

By mapping the enumeration values described in the OpCode documentation in the ocs_base api, one can make a grafana panel to monitor all operations on a network as pictured below:

../_images/operation_monitor_screenshot.png

Agent API

class ocs.agents.registry.agent.Registry(agent, args)[source]

The Registry agent is in charge of keeping track of which agents are currently running. It has a single process “main” that loops and keeps track of when agents expire. This agent subscribes to all heartbeat feeds, so no additional function calls are required to register an agent.

A list of agent statuses is maintained in the “main” process’s session.data object.

Parameters:

agent (OCSAgent) – the ocs agent object

registered_agents

A defaultdict of RegisteredAgent objects, which contain whether the agent has expired, the time_expired, and the last_updated time.

Type:

defaultdict

agent_timeout

The time an agent has between heartbeats before being marked as expired.

Type:

float

main(test_mode=False)[source]

Process - Main run process for the Registry agent. This will loop and keep track of which agents have expired. It will keep track of current active agents in the session.data variable so it can be seen by clients.

Parameters:

test_mode (bool, optional) – Run the main Process loop only once. This is meant only for testing. Default is False.

Notes

The session data object for this process will be a dictionary containing the encoded RegisteredAgent objects for each agent observed during the lifetime of the Registry. For instance, this might look like:

>>> response.session['data']
{
  "observatory.aggregator": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925713.4082503,
    "op_codes": {
      "record": 3
    },
    "agent_class": "AggregatorAgent",
    "agent_address": "observatory.aggregator"
  },
  "observatory.fake-hk-agent-01": {
    "expired": False,
    "time_expired": None,
    "last_updated": 1669925945.7575383,
    "op_codes": {
      "acq": 3,
      "set_heartbeat": 1,
      "delay_task": 1
    },
    "agent_class": "FakeDataAgent",
    "agent_address": "observatory.fake-hk-agent-01"
  }
}

Supporting APIs

class ocs.agents.registry.agent.RegisteredAgent(feed)[source]

Contains data about registered agents.

Parameters:

feed (dict) – Encoded ocs.ocs_feed.Feed.

expired

True if agent has not been updated in Registry.agent_timeout seconds.

Type:

bool

time_expired

ctime at which the agent expired. This will be None if the agent is not expired.

Type:

float, optional

last_updated

ctime at which the agent was last updated

Type:

float

op_codes

Dictionary of operation codes for each of the agent’s operations. For details on what the operation codes mean, see docs from the ocs_agent module

Type:

dict