import argparse
import importlib
import os
import setproctitle
import sys
import warnings
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
from ocs import site_config
DESCRIPTION = """
This script provides a quick way to start an OCS Agent. You will need to set
``OCS_CONFIG_DIR`` environment variable to the directory containing
default.yaml, or else use ``--site-*`` options to specify your configuration.
To start an Agent, run::
ocs-agent-cli --instance-id INSTANCE_ID
``ocs-agent-cli`` will also inspect environment variables for commonly
passed arguments to facilitate configuration within Docker containers.
Those environment variables, if defined and non-trivial, will be
passed to the agent script unless they are overridden explicitly on
the ``ocs-agent-cli`` command line. The environment variables and
arguments are:
- ``INSTANCE_ID``, will be the default value for ``--instance-id``
- ``SITE_HOST``, for ``--site-host``
- ``SITE_HUB``, for ``--site-hub``
- ``SITE_HTTP``, for ``--site-http``
- ``CROSSBAR_TIMEOUT``, for ``--crossbar-timeout``
``ocs-agent-cli`` relies on the Agent being run belonging to an OCS Plugin. If
the Agent is not an OCS Plugin it can be run directly using both the
``--agent`` and ``--entrypoint`` flags. For example::
ocs-agent-cli --agent my_agent.py --entrypoint main --instance-id my-agent-1
"""
def _get_parser():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter)
# Passed through to Agent
parser.add_argument('--instance-id', default=None, help="Agent unique instance-id. E.g. 'aggregator' or 'fakedata-1'.")
parser.add_argument('--site-hub', default=None, help="Site hub address.")
parser.add_argument('--site-http', default=None, help="Site HTTP address.")
parser.add_argument('--site-host', default=None, help="Declare the host the instance is configured in.")
# Default set in site_config.py within add_arguments()
parser.add_argument('--crossbar-timeout', type=int, help="Length of time in seconds "
"that the Agent will try to reconnect to the crossbar server before "
"shutting down. Disable the timeout by setting to 0.")
# Not passed through to Agent
parser.add_argument('--agent', default=None, help="Path to non-plugin OCS Agent.")
parser.add_argument('--entrypoint', default=None, help="Agent module entrypoint function.")
return parser
[docs]
def build_agent_list():
"""Builds a list of all Agents available across all ocs plugins installed
on the system.
Note:
Currently if two plugins provide the same Agent the one loaded first is
used. This should be improved somehow if we expect overlapping Agents
to be provided by plugins.
Examples:
An example agent list:
>>> build_agent_list()
{'RegistryAgent': {'module': 'ocs.agents.registry.agent', 'entry_point': 'main'},
'AggregatorAgent': {'module': 'ocs.agents.aggregator.agent', 'entry_point': 'main'},
'HostManager': {'module': 'ocs.agents.host_manager.agent', 'entry_point': 'main'},
'FakeDataAgent': {'module': 'ocs.agents.fake_data.agent', 'entry_point': 'main'},
'InfluxDBAgent': {'module': 'ocs.agents.influxdb_publisher.agent', 'entry_point': 'main'},
'BarebonesAgent': {'module': 'ocs.agents.barebones.agent', 'entry_point': 'main'}}
Returns:
dict: Dictionary of available agents, with agent names as the keys, and
dicts containing the module and entry_point as values.
"""
discovered_plugins = entry_points(group='ocs.plugins')
print("Installed OCS Plugins:", [x.name for x in discovered_plugins])
agents = {}
for name in discovered_plugins.names:
(plugin, ) = discovered_plugins.select(name=name)
try:
loaded = plugin.load()
except Exception as e:
print(f"Could not load plugin: {name}")
print(" Error:", e)
continue
# Remove any duplicate agent classes from newly loaded plugin
current_agents = set(agents)
conflicts = current_agents.intersection(set(loaded.agents))
for con in conflicts:
warnings.warn(
f'Found duplicate agent-class {con} provided by {name}. '
+ f'Using {agents[con]}.')
del loaded.agents[con]
agents.update(loaded.agents)
return agents
def main(args=None):
# Grab commandline arguments
if args is None:
args = sys.argv[1:]
parser = _get_parser()
# pre_args relevant to ocs-agent-cli, post_args passed to Agent entrypoint
pre_args, post_args = parser.parse_known_args(args)
# Required arguments
if pre_args.instance_id is not None:
# Re-add instance-id to post_args
post_args.extend(["--instance-id", pre_args.instance_id])
else:
# Grab instance-id from ENV for running in Docker
id_env = os.environ.get("INSTANCE_ID", None)
# Inject ENV based instance-id only if not passed on cli
post_args.extend(["--instance-id", id_env])
# instance-id is always required
if id_env is None:
print("--instance-id (or $INSTANCE_ID) not provided. Exiting.")
sys.exit(1)
# Optional arguments
# Add additional optional arguments by appending to optional_env
# Format is {"arg name within argparse": "ENVIRONMENT VARIABLE NAME"}
# E.g. --my-new-arg should be {"my_new_arg": "MY_NEW_ARG"}
optional_env = {"site_hub": "SITE_HUB",
"site_http": "SITE_HTTP",
"site_host": "SITE_HOST",
"crossbar_timeout": "CROSSBAR_TIMEOUT",
}
for _name, _var in optional_env.items():
# Args passed on cli take priority
_arg = vars(pre_args)[_name]
_flag = f"--{_name}".replace('_', '-')
if _arg is not None:
post_args.extend([_flag, str(_arg)])
continue
# Get from ENV if not passed w/flag
set_var = os.environ.get(_var, None)
if set_var is not None:
post_args.extend([_flag, set_var])
print('Args:', post_args) # mostly to debug, but also useful
# Handle running agent directly (outside of plugin package)
if pre_args.agent:
agent_file = pre_args.agent
# Entrypoint required if running outside of plugin system
if pre_args.entrypoint is None:
print("--entrypoint not provided. Exiting")
sys.exit(1)
entrypoint = pre_args.entrypoint
# Insert agent into path for import
script_path = os.path.dirname(agent_file)
sys.path.insert(0, script_path)
mod = importlib.import_module(os.path.basename(agent_file)[:-3])
else:
# Note this is only used to lookup the agent-class, post_args are
# passed to the agent's entrypoint below when calling start(), which
# includes injected ENV based arguments.
args = site_config.parse_args(agent_class='*host*',
parser=None, args=post_args)
# Determine agent-class from instance-id
(site_, host_, instance) = site_config.get_config(args)
agent_class = instance.data['agent-class']
# Import agent's entrypoint and execute
agents = build_agent_list()
agent_info = agents[agent_class]
_module = agent_info["module"]
entrypoint = agent_info.get("entry_point", "main")
mod = importlib.import_module(_module)
title = f'ocs-agent:{instance.data["instance-id"]}'
print(f'Renaming this process to: "{title}"')
setproctitle.setproctitle(title)
start = getattr(mod, entrypoint) # This is the start function.
start(args=post_args)
if __name__ == '__main__':
# This __main__ checker allows ocs-agent-cli to be invoked through
# python -m ocs.agent_cli; this is helpful with, e.g., conda when
# one is trying to use a particular interpreter and its packages.
main()