import ocs
from ocs import ocs_agent, site_config, agent_cli
from ocs.agents.host_manager import drivers as hm_utils
import time
import argparse
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredList
from autobahn.twisted.util import sleep as dsleep
import os
import sys
VALID_TARGETS = ['up', 'down']
# "agent_class" value used for docker services that do not seem to
# have a corresponding SCF entry.
NONAGENT_DOCKER = '[docker]'
[docs]
class HostManager:
"""
This Agent is used to start and stop OCS-relevant services on a
particular host. If the HostManager is launched automatically when
a system boots, it can then be used to start up the rest of OCS on
that host (either automatically or on request).
"""
def __init__(self, agent, docker_composes=[],
docker_service_prefix='ocs-'):
self.agent = agent
self.running = False
# The database maps instance_id (or docker service name, if
# it's an unmanaged docker container) to a ManagedInstance.
self.database = {}
self.orphans = {}
self.new_tags = {}
self.docker_composes = docker_composes
self.docker_service_prefix = docker_service_prefix
@inlineCallbacks
def _get_local_instances(self):
"""Parse the site config and return a list of this host's Agent
instances.
Returns:
success (bool): True if config was successfully scanned.
False otherwise, indiating perhaps we should go into a bit
of a lock down while operator sorts that out.
agent_dict (dict): Maps instance-id to a dict describing the
agent config. The config is as contained in
HostConfig.instances but where 'instance-id',
'agent-class', and 'manage' are all guaranteed populated
(and manage is a valid full description, e.g. "host/down").
warnings: A list of strings, each of which corresponds to
some problem found in the config.
"""
warnings = []
instances = {}
# Load site config file.
try:
site, hc, _ = site_config.get_config(
self.agent.site_args, '*host*')
self.site_config_file = site.source_file
self.host_name = hc.name
self.working_dir = hc.working_dir
except Exception as e:
warnings.append('Failed to read site config file -- '
f'likely syntax error: {e}')
return returnValue((False, instances, warnings))
# Gather managed items from site config.
for inst in hc.instances:
if 'instance-id' not in inst:
warnings.append('Ignoring an entry with no instance-id!')
continue
if inst['instance-id'] in instances:
warnings.append(
f'Configuration problem, instance-id={inst["instance-id"]} '
f'has multiple entries. Ignoring repeats.')
continue
if inst['agent-class'] == 'HostManager':
inst['manage'] = 'ignore'
else:
# Make sure 'manage' is set, and valid.
inst['manage'] = inst.get('manage', None)
try:
inst['manage'] = site_config.InstanceConfig._MANAGE_MAP[inst['manage']]
except KeyError:
warnings.append(
f'Configuration problem, invalid manage={inst["manage"]} '
f'for instance_id={inst["instance-id"]}.')
continue
instances[inst['instance-id']] = inst
returnValue((True, instances, warnings))
yield
@inlineCallbacks
def _update_docker_services(self, session):
"""Parse the docker-compose.yaml files and return the current
status of all services. For any services matching
self.database entries, the corresponding DockerContainerHelper
is updated with the new info.
Returns:
docker_services (dict): state information for all detected
services, keyed by the service name.
"""
# Read services from all docker-compose files.
docker_services = {}
orphans = {}
new_tags = {}
for compose in self.docker_composes:
try:
services, _orphans = yield hm_utils.parse_docker_state(compose)
orphans.update(_orphans)
this_ok = True
this_msg = f'Successfully parsed {compose} and its service states.'
except Exception as e:
this_ok = False
this_msg = (f'Failed to interpret {compose} and/or '
f'its service states: {e}')
# Don't issue the same complaint more than once per minute or so
compose_was_ok, timestamp, last_msg = self.config_parse_status.get(
compose, (False, 0, ''))
if (this_ok != compose_was_ok) \
or (not this_ok and time.time() - timestamp > 60) \
or (not this_ok and this_msg != last_msg):
session.add_message(this_msg)
self.config_parse_status[compose] = (this_ok, time.time(), this_msg)
if this_ok:
docker_services.update(services)
# Update all docker things in the database.
retirees = []
assigned_services = []
for key, minst in self.database.items():
if minst.management != 'docker':
continue
service_name = minst.agent_script
service_data = docker_services.get(service_name)
assigned_services.append(service_name)
prot = minst.prot
if prot is None:
if service_data is not None:
# Create a prot entry with the service info.
minst.prot = hm_utils.DockerContainerHelper(service_data)
minst.operable = True
if minst.agent_class != NONAGENT_DOCKER:
minst.agent_class = _clsname_tool(minst.agent_class, '[d]')
# Update current_state to not trigger a "docker
# up" on a running container. Normally that would
# be a no-op, but not if image tag has changed.
if service_data['running']:
session.add_message(f'Docker-based instance {key} seems to be running.')
minst.next_action = 'up'
else:
if service_data is not None:
prot.update(service_data)
else:
# service_data is missing, but there used to be a
# service there. Close it out.
minst.prot = None
minst.operable = False
if minst.agent_class == NONAGENT_DOCKER:
session.add_message(f'Deleting non-agent service {key}')
retirees.append(key)
else:
session.add_message(f'Marking missing service for {key}')
minst.agent_class = _clsname_tool(minst.agent_class, '[d?]')
restart_required = False
if minst.prot is not None and service_data is not None \
and service_data.get('running') \
and (service_data.get('running_image') != service_data.get('image_id')):
restart_required = True
if service_data is not None and service_data.get('image_id') == 'unknown':
new_tags[key] = service_data['image_tag']
minst.restart_required = restart_required
# If a non-agent [docker] service has disappeared, there's no
# reason to show it in a list, and no persistent state /
# operations to worry about. So just delete it.
for r in retirees:
self.database.pop(r)
# Create entries for any new un-matched docker services.
unassigned_services = set(docker_services.keys()) \
.difference(assigned_services)
for srv in unassigned_services:
session.add_message(f'Adding non-agent service "{srv}"')
minst = hm_utils.ManagedInstance(
management='docker',
instance_id=srv,
agent_class=NONAGENT_DOCKER,
full_name=f'[docker]:{srv}',
agent_script=srv,
operable=True,
passive_tracking=True,
target_state='passive',
)
service_data = docker_services[srv]
minst.prot = hm_utils.DockerContainerHelper(service_data)
# If it's up, leave it up.
if service_data['running']:
minst.next_action = 'up'
self.database[srv] = minst
# Update the list of orphans ...
orphans_gone = set(self.orphans.keys()).difference(orphans.keys())
for k in orphans_gone:
del self.orphans[k]
self.orphans.update(orphans)
# And new tags that need a docker pull ...
self.new_tags = new_tags
returnValue(docker_services)
[docs]
@inlineCallbacks
def _reload_config(self, session):
"""This helper function is called by both the ``manager``
Process at startup, and the ``update`` Task.
The Site Config File is parsed and used to update the internal
database of child instances. Any previously unknown child
Agent is added to the internal tracking database, and assigned
whatever target state is specified for that instance. Any
previously known child Agent instance is not modified.
If any child Agent instances in the internal database appear
to have been removed from the SCF, then they are set to have
target_state "down" and will be deleted from the database when
that state is reached.
"""
def retire(db_key):
minst = self.database.get(db_key, None)
if minst is None:
return
minst.management = 'retired'
minst.at = time.time()
minst.target_state = 'down'
def _full_name(cls, iid):
if cls != NONAGENT_DOCKER:
cls, _ = _clsname_tool(cls)
return f'{cls}:{iid}'
def same_base_class(a, b):
return _clsname_tool(a)[0] == _clsname_tool(b)[0]
# Parse the site config.
parse_ok, agent_dict, warnings = yield self._get_local_instances()
for w in warnings:
session.add_message(w)
self.config_parse_status['[SCF]'] = (parse_ok, time.time(), ''.join(warnings))
if not parse_ok:
return warnings
# Any agents in the database that are not listed in the latest
# agent_dict should be immediately retired. That includes
# things that are suddenly marked as manage=no. Ignore docker
# non-agents.
for iid, minst in self.database.items():
if minst.agent_class != NONAGENT_DOCKER and (
iid not in agent_dict or agent_dict[iid].get('manage') == 'ignore'):
session.add_message(
f'Retiring {minst.full_name}, which has disappeared from '
f'configuration file(s) or has manage:no.')
retire(iid)
# Create / update entries for every agent in the host
# description, unless it is explicitly marked as ignore.
for iid, hinst in agent_dict.items():
if hinst['manage'] == 'ignore':
continue
cls = hinst['agent-class']
srv = None # The expected docker service name, if any
mgmt, start_state = hinst['manage'].split('/')
if mgmt == 'docker':
cls = _clsname_tool(cls, '[d?]')
srv = self.docker_service_prefix + iid
# See if we already tracking this agent.
minst = self.database.get(iid)
if minst is not None:
# Already tracking; just check for major config change.
_cls = minst.agent_class
_mgmt = minst.management
if not same_base_class(_cls, cls) or _mgmt != mgmt:
session.add_message(
f'Managed agent "{iid}" changed agent_class '
f'({_cls} -> {cls}) or management '
f'({_mgmt} -> {mgmt}) and is being reset!')
# Bring down existing instance
self._terminate_instance(iid)
# Start a new one
minst = None
# Do we have an unmatched docker entry for this?
if minst is None and srv in self.database:
session.add_message(
f'Unmanaged docker service {srv} will now be managed as '
f'instance_id={iid}.')
# Re-register it under instance_id
minst = self.database.pop(srv)
minst.instance_id = iid
minst.agent_class = _clsname_tool(cls, '[d]')
minst.full_name = _full_name(cls, iid)
minst.passive_tracking = False
if minst.target_state == 'passive':
minst.target_state = \
minst.next_action if minst.next_action in ['up', 'down'] else 'down'
self.database[iid] = minst
if minst is None:
minst = hm_utils.ManagedInstance(
management=mgmt,
instance_id=iid,
agent_class=cls,
full_name=_full_name(cls, iid),
target_state=start_state,
)
self.database[iid] = minst
# Get agent class list from modern plugin system.
agent_plugins = agent_cli.build_agent_list()
# Assign plugins / scripts / whatever to any new instances.
for iid, minst in self.database.items():
if minst.agent_script is not None:
continue
if minst.management == 'host':
cls = minst.agent_class
# Check for the agent class in the plugin system
if cls in agent_plugins:
session.add_message(f'Found plugin for "{cls}"')
minst.agent_script = '__plugin__'
minst.operable = True
else:
session.add_message('No plugin '
f'found for agent_class "{cls}"!')
elif minst.management == 'docker':
minst.agent_script = self.docker_service_prefix + iid
# Read the compose files; query container states; updater stuff.
yield self._update_docker_services(session)
returnValue(warnings)
def _launch_instance(self, minst):
"""Launch an Agent instance (whether 'host' or 'docker' managed) using
hm_utils.
For 'host' managed agents: hm_utils will use
reactor.spawnProcess, and store the AgentProcessHelper (which
inherits from twisted ProcessProtocol) in minst.prot.
The site_file and instance_id are passed on the command line;
this means that any weird config overrides passed to this
HostManager are not propagated. One exception is working_dir,
which is propagated in order that relative paths can make any
sense.
For 'docker' managed agents: hm_utils will try to start the
right service container, and minst.prot will hold a
DockerContainerHelper (which has some common interface with
AgentProcessHelper).
"""
if minst.management == 'docker':
prot = minst.prot
else:
iid = minst.instance_id
pyth = sys.executable
cmd = [pyth, '-m', 'ocs.agent_cli',
'--instance-id', iid,
'--site-file', self.site_config_file,
'--site-host', self.host_name,
'--working-dir', self.working_dir]
prot = hm_utils.AgentProcessHelper(iid, cmd)
prot.up()
minst.prot = prot
def _terminate_instance(self, key):
"""
Use the ProcessProtocol to request the Agent instance to exit.
"""
prot = self.database[key].prot # Get the ProcessProtocol.
if prot is None:
return True, 'Instance was not running.'
if prot.killed:
return True, 'Instance already has kill set.'
# Note the .down call does not block -- the thing will be
# stopped from a thread.
prot.down()
return True, 'Kill requested.'
def _process_target_states(self, session, requests=[]):
"""This is a helper function for parsing target_state change
requests; see the update Task.
"""
# Special requests will target specific instance_id; make a map for that.
addressable = {k: minst for k, minst in self.database.items()
if minst.management != 'retired'}
for key, state in requests:
if state not in VALID_TARGETS:
session.add_message('Ignoring request for "%s" -> invalid state "%s".' %
(key, state))
continue
if key == 'all':
for minst in addressable.values():
minst.target_state = state
else:
if key in addressable:
addressable[key].target_state = state
else:
session.add_message(f'Ignoring invalid target, {key}')
[docs]
@ocs_agent.param('requests', default=[])
@ocs_agent.param('reload_config', default=True, type=bool)
@inlineCallbacks
def manager(self, session, params):
"""manager(requests=[], reload_config=True)
**Process** - The "manager" Process maintains a list of child
Agents for which it is responsible. In response to requests
from a client, the Process will launch or terminate child
Agents.
Args:
requests (list): List of agent instance target state
requests; e.g. [('instance1', 'down')]. See description
in :meth:`update` Task.
reload_config (bool): When starting up, discard any cached
database of tracked agents and rescan the Site Config
File. This is mostly for debugging.
Notes:
If an Agent process exits unexpectedly, it will be
relaunched within a few seconds.
When this Process is started (or restarted), the list of
tracked agents and their status is completely reset, and the
Site Config File is read in.
Once this process is running, the target states for managed
Agents can be manipulated through the :meth:`update` task.
Note that when a stop is requested on this Process, all
managed Agents will be moved to the "down" state and an
attempt will be made to terminate them before the Process
exits.
The session.data is a dict, with entries:
- 'child_states'
- 'config_parse_status' - indicates how recently the various
input files havebeen parsed.
- 'orphans' - lists any orphaned (in the sense of docker
compose) containers.
- 'new_tags' - dict mapping instance_id to new docker image
tag, if that tag is not known to docker system. Only
populated for tracked instances where the tag is not
known.
The 'child_states' entry is a list of managed Agent status;
for example::
[
{'next_action': 'up',
'target_state': 'up',
'stability': 1.0,
'agent_class': 'Lakeshore372Agent',
'instance_id': 'thermo1'},
{'next_action': 'down',
'target_state': 'down',
'stability': 1.0,
'agent_class': 'ACUAgent',
'instance_id': 'acu-1'},
{'next_action': 'up',
'target_state': 'up',
'stability': 1.0,
'agent_class': 'FakeDataAgent[d]',
'instance_id': 'faker6'},
]
If you are looking for the "current state", it's called
"next_action" here.
The agent_class may include a suffix [d] or [d?], indicating
that the agent is configured to run within a docker
container. (The question mark indicates that the
HostManager cannot actually identify the docker-compose
service associated with the agent description in the SCF.)
The 'config_parse_status' is a dict where the key is a
docker compose filename, or "[SCF]" for the site config
file, and the value is a tuple (success, timestamp,
message).
The 'orphans' entry is as a dict mapping docker container ID
to some information about the container. E.g.::
{
"30027f37e0ef4b...": {
"compose_file": "/home/ocs/config/docker-compose.yml",
"service": "ocs-faker3",
"container_id": "30027f37e0ef4b...",
"running": true,
"exit_code": 0,
"container_found": true,
"running_image": "sha256:7eaa6d6f6..."
}
}
The 'new_tags' entry looks like this::
{
"faker1": "simonsobs/ocs:v0.11.3",
"faker2": "simonsobs/ocs:v0.11.3"
}
"""
self.config_parse_status = {}
session.data = {
'child_states': [],
'config_parse_status': self.config_parse_status,
'orphans': self.orphans,
'new_tags': self.new_tags,
}
self.running = True
if params['reload_config']:
self.database = {}
yield self._reload_config(session)
self._process_target_states(session, params['requests'])
next_docker_update = time.time()
any_jobs = False
while self.running or any_jobs:
if time.time() >= next_docker_update:
yield self._update_docker_services(session)
next_docker_update = time.time() + 2
sleep_times = [1.]
any_jobs = False
for key, minst in self.database.items():
# If Process exit is requested, force all targets to down.
if not self.running and not minst.passive_tracking:
minst.target_state = 'down'
actions = hm_utils.resolve_child_state(minst)
for msg in actions['messages']:
session.add_message(msg)
if actions['terminate']:
self._terminate_instance(key)
if actions['launch']:
reactor.callFromThread(self._launch_instance, minst)
if actions['sleep']:
sleep_times.append(actions['sleep'])
if minst.passive_tracking:
this_job = minst.next_action != 'passive'
else:
this_job = minst.next_action != 'down'
any_jobs = (any_jobs or this_job)
# Criteria for stability:
minst.fail_times, minst.stability = hm_utils.stability_factor(
minst.fail_times)
# Clean up retired items.
self.database = {
k: minst for k, minst in self.database.items()
if minst.management != 'retired' or minst.next_action not in ['down', '?']}
# Update session info.
child_states = []
for minst in self.database.values():
child_states.append({_k: getattr(minst, _k) for _k in
['next_action',
'target_state',
'stability',
'agent_class',
'instance_id',
'operable',
'restart_required',
]})
session.data['child_states'] = child_states
session.data['new_tags'] = self.new_tags
yield dsleep(max(min(sleep_times), .001))
return True, 'Exited.'
@inlineCallbacks
def _stop_manager(self, session, params):
yield
if session.status == 'done':
return
session.set_status('stopping')
self.running = False
return True, 'Stop initiated.'
[docs]
@ocs_agent.param('requests', default=[])
@ocs_agent.param('reload_config', default=False, type=bool)
@inlineCallbacks
def update(self, session, params):
"""update(requests=[], reload_config=False)
**Task** - Update the target state for any subset of the
managed agent instances. Optionally, trigger a full reload of
the Site Config File first.
Args:
requests (list): Default is []. Each entry must be a tuple
of the form ``(instance_id, target_state)``. The
``instance_id`` must be a string that matches an item in
the current list of tracked agent instances, or be the
string 'all', which will match all items being tracked.
The ``target_state`` must be 'up' or 'down'.
reload_config (bool): Default is False. If True, the site
config file and docker-compose files are reparsed in order
to (re-)populate the database of child Agent instances.
Examples:
::
update(requests=[('thermo1', 'down')])
update(requests=[('all', 'up')])
update(reload_config=True)
Notes:
Starting and stopping agent instances is handled by the
:meth:`manager` Process; if that Process is not running then
no action is taken by this Task and it will exit with an
error.
The entries in the ``requests`` list are processed in order.
For example, if the requests were [('all', 'up'), ('data1',
'down')]. This would result in setting all known children
to have target_state "up", except for "data1" which would be
given target state of "down".
If ``reload_config`` is True, the Site Config File will be
reloaded (as described in :meth:`_reload_config`) before
any of the requests are processed.
Managed docker-compose.yaml files are reparsed, continously,
by the manager process -- no specific action is taken with
those in this Task. Note that adding/changing the list of
docker-compose.yaml files requires restarting the agent.
"""
if not self.running:
return False, 'Manager process is not running; params not updated.'
if params['reload_config']:
yield self._reload_config(session)
self._process_target_states(session, params['requests'])
return True, 'Update requested.'
[docs]
@inlineCallbacks
def remove_orphans(self, session, params):
"""remove_orphans(stop_time=10.)
**Task** - Use docker stop and docker rm to remove orphaned
containers associated with managed docker compose files.
This does not really do any error checking.
"""
containers = list(self.orphans.values())
session.add_message(f'Attempting stop and remove of {len(containers)} containers.')
defs = []
for cont in containers:
print(f'Stopping {cont["container_id"][:16]} ...')
d = hm_utils._run_docker(['stop', cont['container_id']])
defs.append(d)
yield DeferredList(defs)
defs = []
for cont in containers:
print(f'Removing {cont["container_id"][:16]} ...')
d = hm_utils._run_docker(['rm', cont['container_id']])
defs.append(d)
yield DeferredList(defs)
return True, 'Done.'
[docs]
@inlineCallbacks
def docker_pull(self, session, params):
"""docker_pull()
**Task** - Use docker compose to pull any (new) images for the
managed docker compose files.
"""
for compose in self.docker_composes:
session.add_message(f'Running pull for {compose} ...')
yield hm_utils._run_docker(['compose', '-f', compose, 'pull'])
return True, 'Done.'
[docs]
@inlineCallbacks
@ocs_agent.param('disown_dockers', default=False, type=bool)
def die(self, session, params):
"""die(disown_dockers=False)
**Task** - trigger a shutdown of the manage process and then
stop the reactor, causing the HostManager to exit.
Args:
disown_dockers (bool): If True, then all tracked docker
services will be put in "passive tracking" mode, meaning
that they will not be stopped and removed during this
shutdown process. This can be used to restart HostManager
without needing to also restart all (docker-based) agents
on the system.
"""
if params['disown_dockers']:
for minst in self.database.values():
if minst.management == 'docker':
minst.passive_tracking = True
if minst.target_state in ['up', 'down']:
minst.target_state = 'passive'
if not self.running:
session.add_message('Manager process is not running.')
else:
session.add_message('Requesting exit of manager process.')
ok, msg, mp_session = self.agent.stop('manager')
ok, msg, mp_session = yield self.agent.wait('manager', timeout=10.)
if ok == ocs.OK:
session.add_message('... manager Process has exited.')
else:
session.add_message('... timed-out waiting for manager Process exit!')
# Schedule program exit.
reactor.callLater(1., reactor.stop)
return True, 'This HostManager should terminate in about 1 second.'
def _clsname_tool(name, new_suffix=None):
try:
i = name.index('[')
except ValueError:
i = len(name)
base, suffix = name[:i], name[i:]
if new_suffix is None:
return base, suffix
return base + new_suffix
[docs]
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--initial-state', default=None,
choices=['up', 'down'],
help="Force a single target state for all agents, "
"on start-up.")
pgroup.add_argument('--docker-compose', default=None,
help="Comma-separated list of docker-compose files "
"to parse and manage.")
pgroup.add_argument('--docker-service-prefix', default='ocs-',
help="Prefix, to be used in combination with "
"instance-id, for recognizing docker services "
"that correspond to entries in site config.")
pgroup.add_argument('--docker-compose-bin', default=None,
help="Path to docker-compose binary. This "
"will be interpreted as a path relative to "
"current working directory. If not specified, "
"will try to use `which docker-compose`.")
pgroup.add_argument('--quiet', action='store_true',
help="Suppress output to stdout/stderr.")
return parser
[docs]
def main(args=None):
parser = make_parser()
args = site_config.parse_args(agent_class='HostManager',
parser=parser,
args=args)
if args.quiet:
# For launch-to-background, disconnect stdio.
null = os.open(os.devnull, os.O_RDWR)
for stream in [sys.stdin, sys.stdout, sys.stderr]:
os.dup2(null, stream.fileno())
os.close(null)
agent, runner = ocs_agent.init_site_agent(args)
docker_composes = []
if args.docker_compose:
docker_composes = args.docker_compose.split(',')
host_manager = HostManager(agent, docker_composes=docker_composes,
docker_service_prefix=args.docker_service_prefix)
startup_params = {}
if args.initial_state:
startup_params = {'requests': [('all', args.initial_state)]}
agent.register_process('manager',
host_manager.manager,
host_manager._stop_manager,
blocking=False,
startup=startup_params)
agent.register_task('update', host_manager.update, blocking=False)
agent.register_task('remove_orphans', host_manager.remove_orphans, blocking=False)
agent.register_task('docker_pull', host_manager.docker_pull, blocking=False)
agent.register_task('die', host_manager.die, blocking=False)
reactor.addSystemEventTrigger('before', 'shutdown', agent._stop_all_running_sessions)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()