Source code for ocs.agents.registry.agent

from ocs import ocs_agent, site_config
from ocs.base import OpCode
import time
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep as dsleep
from ocs.ocs_feed import Feed
import argparse


[docs] class RegisteredAgent: """ Contains data about registered agents. Args: feed (dict): Encoded :class:`ocs.ocs_feed.Feed`. Attributes: expired (bool): True if agent has not been updated in Registry.agent_timeout seconds. time_expired (float, optional): ctime at which the agent expired. This will be None if the agent is not expired. last_updated (float): ctime at which the agent was last updated op_codes (dict): Dictionary of operation codes for each of the agent's operations. For details on what the operation codes mean, see docs from the ``ocs_agent`` module """ def __init__(self, feed): self.expired = False self.time_expired = None self.last_updated = time.time() self.op_codes = {} self.agent_class = feed.get('agent_class') self.agent_address = feed['agent_address']
[docs] def refresh(self, op_codes=None): self.expired = False self.time_expired = None self.last_updated = time.time() if op_codes: self.op_codes.update(op_codes)
[docs] def expire(self): self.expired = True self.time_expired = time.time() for k in self.op_codes: self.op_codes[k] = OpCode.EXPIRED.value
[docs] def encoded(self): return { 'expired': self.expired, 'time_expired': self.time_expired, 'last_updated': self.last_updated, 'op_codes': self.op_codes, 'agent_class': self.agent_class, 'agent_address': self.agent_address, }
[docs] class Registry: """ The Registry agent is in charge of keeping track of which agents are currently running. It has a single process "main" that loops and keeps track of when agents expire. This agent subscribes to all heartbeat feeds, so no additional function calls are required to register an agent. A list of agent statuses is maintained in the "main" process's session.data object. Args: agent (OCSAgent): the ocs agent object Attributes: registered_agents (defaultdict): A defaultdict of RegisteredAgent objects, which contain whether the agent has expired, the time_expired, and the last_updated time. agent_timeout (float): The time an agent has between heartbeats before being marked as expired. """ def __init__(self, agent, args): self.log = agent.log self.agent = agent self.wait_time = args.wait_time # Tracking for 'main' Process self._run = False # Dict containing agent_data for each registered agent self.registered_agents = {} self.agent_timeout = 5.0 # Removes agent after 5 seconds of no heartbeat. self.agent.subscribe_on_start( self._register_heartbeat, f'{args.address_root}..feeds.heartbeat', options={'match': 'wildcard'} ) agg_params = { 'frame_length': 60, } self.agent.register_feed('agent_operations', record=True, agg_params=agg_params, buffer_time=0) def _register_heartbeat(self, _data): """ Function that is called whenever a heartbeat is received from an agent. It will update that agent in the Registry's registered_agent dict. """ op_codes, feed = _data addr = feed['agent_address'] if addr not in self.registered_agents: self.registered_agents[addr] = RegisteredAgent(feed) reg_agent = self.registered_agents[addr] publish = op_codes != reg_agent.op_codes self.registered_agents[addr].refresh(op_codes=op_codes) if publish: self._publish_agent_ops(reg_agent) def _publish_agent_ops(self, reg_agent): """Publish a registered agent's OpCodes. Args: reg_agent (RegisteredAgent): The registered agent. """ addr = reg_agent.agent_address self.log.debug(addr) for op_name, op_code in reg_agent.op_codes.items(): field = f'{addr}_{op_name}' field = field.replace('.', '_') field = field.replace('-', '_') field = Feed.enforce_field_name_rules(field) try: Feed.verify_data_field_string(field) except ValueError as e: self.log.warn(f"Improper field name: {field}\n{e}") continue msg = {'block_name': field, 'timestamp': time.time(), 'data': {field: op_code}} self.agent.publish_to_feed('agent_operations', msg)
[docs] @ocs_agent.param('test_mode', default=False, type=bool) @inlineCallbacks def main(self, session: ocs_agent.OpSession, params): """main(test_mode=False) **Process** - Main run process for the Registry agent. This will loop and keep track of which agents have expired. It will keep track of current active agents in the session.data variable so it can be seen by clients. Parameters: test_mode (bool, optional): Run the main Process loop only once. This is meant only for testing. Default is False. Notes: The session data object for this process will be a dictionary containing the encoded RegisteredAgent objects for each agent observed during the lifetime of the Registry. For instance, this might look like:: >>> response.session['data'] { "observatory.aggregator": { "expired": False, "time_expired": None, "last_updated": 1669925713.4082503, "op_codes": { "record": 3 }, "agent_class": "AggregatorAgent", "agent_address": "observatory.aggregator" }, "observatory.fake-hk-agent-01": { "expired": False, "time_expired": None, "last_updated": 1669925945.7575383, "op_codes": { "acq": 3, "set_heartbeat": 1, "delay_task": 1 }, "agent_class": "FakeDataAgent", "agent_address": "observatory.fake-hk-agent-01" } } """ self._run = True last_publish = time.time() while self._run: yield dsleep(1) now = time.time() for k, agent in self.registered_agents.items(): if now - agent.last_updated > self.agent_timeout: agent.expire() session.data = { k: agent.encoded() for k, agent in self.registered_agents.items() } if now - last_publish >= self.wait_time: last_publish = now for agent in self.registered_agents.values(): self._publish_agent_ops(agent) if params['test_mode']: break return True, "Stopped registry main process"
@inlineCallbacks def _stop_main(self, session, params): """Stop function for the 'main' process.""" yield if self._run: session.set_status('stopping') self._run = False return True, 'requested to stop main process' else: return False, 'main process not currently running' def _register_agent(self, session, agent_data): self.log.warn( "Warning!!! The register_agent task has been deprecated. Agent '{}' " "is using an out of date version of ocs or socs!!" .format(agent_data['agent_address']) ) return True, "'register_agent' is deprecated"
[docs] def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--wait-time', type=float, default=30., help='Sleep time for main loop') return parser
[docs] def main(args=None): parser = make_parser() args = site_config.parse_args(agent_class='RegistryAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) registry = Registry(agent, args) agent.register_process('main', registry.main, registry._stop_main, blocking=False, startup=True) agent.register_task('register_agent', registry._register_agent, blocking=False) runner.run(agent, auto_reconnect=True)
if __name__ == '__main__': main()