Source code for ocs.agents.host_manager.drivers

import os
import time
import yaml

from twisted.internet import reactor, utils, protocol
from twisted.internet.defer import inlineCallbacks

from dataclasses import dataclass, field
from typing import List


WAIT_DEAD_TIME = 11
WAIT_START_TIME_INIT = 15
WAIT_START_TIME_FOLLOWUP = 5


[docs] @dataclass class ManagedInstance: """Tracks the properties of a managed Agent-instance, including how to launch it, the current run state, target state, etc. """ #: How host is managed; either "host", "docker", or "retired". management: str #: Agent class name (which may include suffix "[d]" or "[d?]" for #: docker-managed instances; or simply "[docker]" for services #: that do not seem to be registered in the SCF. agent_class: str #: The agent instance's instance_id, or else the docker service #: name associated with entry in the SCF. instance_id: str #: Indentier constructed as agent_class:instance_id. full_name: str #: Indicates whether the instance can be manipulated (whether #: calls to up/down should be expected to work). operable: bool = False #: Indicates if instance is retired and can be removed from #: tracking. retired: bool = False #: Indicates if instance should be "passively" managed, e.g. not #: be enforced other than ephemerally to attempt a start / stop. #: This is expected to only be used for docker-based instances. passive_tracking: bool = False #: The docker service name, if docker-managed; otherwisre the #: string ``__plugin__`` to indicate it is host managed. agent_script: str = None #: The Twisted ProcessProtocol object, if host system managed; or #: else the DockerContainerHelper if docker-based. prot: object = None #: Indicates a restart is in order, due to change of docker tag or #: other new software version. restart_required: bool = False #: The run state HostManager is trying to enforce (up, down, passive). target_state: str = 'down' #: The thing HostManager plans to do next; this will sometimes #: mirror the current state (up or down) and will sometimes carry a #: transitional state, such as "wait_start". next_action: str = 'down' #: Unix timestamp, used by transitional states to indicate time at #: which some subsequent action should be taken. at: float = 0 #: List of unix timestamps for recent events where an instance #: stopped unexpectedly; used to identify "unstable" agents. fail_times: List = field(default_factory=list) @property def is_running(self): if self.prot is None: return False return self.prot.is_running @property def exit_code(self): if self.prot is None: return None return self.prot.status[0]
[docs] def resolve_child_state(minst): """Args: minst (ManagedInstance): the instance state information. This will be modified in place. Returns: Dict with important actions for caller to take. Content is: - 'messages' (list of str): messages for the session. - 'launch' (bool): whether to launch a new instance. - 'terminate' (bool): whether to terminate the instance. - 'sleep' (float): maximum delay before checking back, or None if this machine doesn't care. """ actions = { 'launch': False, 'terminate': False, 'sleep': None, } messages = [] sleeps = [] # If the entry is not "operable", send next_action to '?' and # don't try to do anything else. if not minst.operable: minst.next_action = '?' # The uninterruptible transition state(s) are most easily handled # in the same way regardless of target state. # Transitional: wait_start, which bridges from start -> up. elif minst.next_action == 'wait_start': if minst.is_running: messages.append('Launched {0.full_name}'.format(minst)) minst.next_action = 'up' if minst.passive_tracking: minst.target_state = 'passive' elif time.time() >= minst.at: if minst.passive_tracking: messages.append( 'Launch not detected for {0.full_name}! ' 'Passive tracking so will not try again.' .format(minst)) minst.target_state = 'passive' minst.next_action = 'down' else: messages.append('Launch not detected for ' '{0.full_name}! Will retry.'.format(minst)) minst.next_action = 'start_at' minst.at = time.time() + WAIT_START_TIME_FOLLOWUP # Transitional: wait_dead, which bridges from kill -> idle. elif minst.next_action == 'wait_dead': if not minst.is_running: minst.next_action = 'down' if minst.passive_tracking: minst.target_state = 'passive' messages.append('Agent instance {0.full_name} has exited' .format(minst)) elif time.time() >= minst.at: messages.append('Agent instance {0.full_name} ' 'refused to die.'.format(minst)) minst.next_action = 'down' else: sleeps.append(minst.at - time.time()) # State handling when target is to be 'up'. elif minst.target_state == 'up': if minst.next_action == 'start_at': if time.time() >= minst.at: minst.next_action = 'start' else: sleeps.append(minst.at - time.time()) elif minst.next_action == 'start': messages.append( 'Requested launch for {0.full_name}'.format(minst)) actions['launch'] = True minst.next_action = 'wait_start' now = time.time() minst.at = now + WAIT_START_TIME_INIT elif minst.next_action == 'up': if not minst.is_running: messages.append('Detected exit of {0.full_name} ' 'with code {0.exit_code}.'.format(minst)) if hasattr(minst.prot, 'lines'): note = '' lines = minst.prot.lines['stderr'] if len(lines) > 50: note = ' (trimmed)' lines = lines[-20:] messages.append('stderr output from {minst.full_name}{note}: {}' .format('\n'.join(lines), note=note, minst=minst)) minst.next_action = 'start_at' minst.at = time.time() + 3 minst.fail_times.append(time.time()) else: # 'down' minst.next_action = 'start' # State handling when target is to be 'down'. elif minst.target_state == 'down': if minst.next_action == 'down': # In fully managed mode, force a termination. if minst.is_running: messages.append('Detected unexpected session for {0.full_name} ' '(probably docker); it will be shut down.'.format(minst)) minst.next_action = 'up' elif minst.next_action == 'up': messages.append('Requesting termination of ' '{0.full_name}'.format(minst)) actions['terminate'] = True minst.next_action = 'wait_dead' minst.at = time.time() + WAIT_DEAD_TIME else: # 'start_at', 'start' messages.append('Modifying state of {0.full_name} from ' '{0.next_action} to idle'.format(minst)) minst.next_action = 'down' elif minst.passive_tracking: # For passive tracking, next_action always reflects the # current running state. if minst.is_running and minst.next_action != 'up': messages.append(f'Passively tracked {minst.full_name} is now up.') minst.next_action = 'up' elif not minst.is_running and minst.next_action != 'down': messages.append(f'Passively tracked {minst.full_name} is now down.') minst.next_action = 'down' minst.target_state = 'passive' # Should not get here. else: messages.append( 'State machine failure: state={0.next_action}, target_state' '={0.target_state}'.format(minst)) actions['messages'] = messages if len(sleeps): actions['sleep'] = min(sleeps) return actions
[docs] def stability_factor(times, window=120): """Given an increasing list of failure times, quantify the stability of the activity. A single failure, 10 seconds in the past, has a stability factor of 0.5; if there were additional failures before that, the stability factor will be lower. Returns a culled list of stop times and a stability factor (0 - 1). """ now = time.time() if len(times) == 0: return times, 1. # Only keep the last few failures, within our time window. times = [t for t in times[-200:-1] if t >= now - window] + times[-1:] dt = [5. / (now - t) for t in times] return times, max(1 - sum(dt), 0.)
[docs] class AgentProcessHelper(protocol.ProcessProtocol): def __init__(self, instance_id, cmd): super().__init__() self.status = None, None self.killed = False self.instance_id = instance_id self.cmd = cmd self.lines = {'stderr': [], 'stdout': []}
[docs] def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ)
[docs] def down(self): self.killed = True # race condition, but it could be worse. if self.status[0] is None: reactor.callFromThread(self.transport.signalProcess, 'INT')
@property def is_running(self): return self.status[0] is None # See https://twistedmatrix.com/documents/current/core/howto/process.html # # These notes, and the useless prototypes below them, are to get # us started when we come back here later to feed the process # output to high level logging somehow. # # In a successful launch, we see: # - connectionMade (at which point we closeStdin) # - inConnectionLost (which is then expected) # - childDataReceived(counter, message), output from the script. # - later, when process exits: processExited(status). Status is some # kind of object that knows the return code... # In a failed launch, it's the same except note that: # - The childDataReceived message contains the python traceback, on, # e.g. realm error. +1 - Informative. # - The processExited(status) knows the return code was not 0. # # Note that you implement childDataReceived instead of # "outReceived" and "errReceived".
[docs] def connectionMade(self): self.transport.closeStdin()
[docs] def inConnectionLost(self): pass
[docs] def processExited(self, status): # print('%s.status:' % self.instance_id, status) self.status = status, time.time()
[docs] def outReceived(self, data): self.lines['stdout'].extend(data.decode('utf8').split('\n')) if len(self.lines['stdout']) > 100: self.lines['stdout'] = self.lines['stdout'][-100:]
[docs] def errReceived(self, data): self.lines['stderr'].extend(data.decode('utf8').split('\n')) if len(self.lines['stderr']) > 100: self.lines['stderr'] = self.lines['stderr'][-100:]
def _decode(args): out, err, code = args return (out.decode('utf8'), err.decode('utf8'), code) def _deyaml(args): out, err, code = args return (yaml.safe_load(out), err, code) def _run_docker(args, decode=False, deyaml=False): d = utils.getProcessOutputAndValue( 'docker', args, env=os.environ) if decode or deyaml: d = d.addCallback(_decode) if deyaml: d = d.addCallback(_deyaml) return d
[docs] class DockerContainerHelper: """Class for managing the docker container associated with some service. Provides some of the same interface as AgentProcessHelper. Pass in a service description dict (such as the ones returned by parse_docker_state). """ def __init__(self, service, docker_bin=None): self.service = {} self.is_running = False self.status = None, time.time() self.killed = False self.instance_id = service['service'] self.d = None self.update(service)
[docs] def update(self, service): """Update self.status based on service info (in format returned by parse_docker_state). """ self.service.update(service) if service['running']: self.is_running = True self.status = None, time.time() else: self.is_running = False self.status = service['exit_code'], time.time() self.killed = False
[docs] def up(self): self.d = _run_docker( ['compose', '-f', self.service['compose_file'], 'up', '--remove-orphans', '-d', self.service['service']]) self.status = None, time.time()
[docs] def down(self): self.d = _run_docker( ['compose', '-f', self.service['compose_file'], 'rm', '--stop', '--force', self.service['service']]) self.killed = True
[docs] @inlineCallbacks def parse_docker_state(docker_compose_file): """Analyze a docker compose.yaml file to get a list of services. Using docker compose ps and docker inspect, determine whether each service is running or not. Returns: services: A dict where the key is the service name and each value is a dict with the following entries: - 'compose_file': the path to the docker compose file - 'service': service name - 'image_tag': the tag listed for the image in the compose file (this may differ from the running image). - 'image_id': the docker image ID corresponding to 'image_tag'; will be "unknown" if, e.g., listed tag is not yet pulled to the running system. - 'container_found': bool, indicates whether a container for this service was found (whether or not it was running). - 'container_id': the docker ID of the container (if found). - 'running': bool, indicating that the found container is in state "Running". - 'running_image': the ID of the image for the container (if found; e.g. "sha:0f..."). - 'exit_code': int, which is either extracted from the docker inspect output or is set to 127. (This should never be None.) orphans: A dict (by container id) of dicts describing running containers that are associated with this compose file but have apparently been removed from the service list. Key is the service name. """ summary = {} orphans = {} compose, err, code = yield _run_docker(['compose', '-f', docker_compose_file, 'config'], deyaml=True) for key, cfg in compose.get('services', {}).items(): summary[key] = { 'compose_file': docker_compose_file, 'service': key, 'image_tag': cfg['image'], 'image_id': 'unknown', 'container_found': False, 'container_id': None, 'running': False, 'running_image': None, 'exit_code': 127, } # Look up each tag; create map from tag to image_id. to_inspect = list(set([cfg['image_tag'] for cfg in summary.values()])) image_ids = {} if len(to_inspect): # Output from inspect is not neccessarily one-to-one with # items on command line, if image of a tag is not yet known. out, err, code = yield _run_docker(['inspect'] + to_inspect, deyaml=True) for image in out: image_ids.update({k: image['Id'] for k in image['RepoTags']}) for cfg in summary.values(): cfg['image_id'] = image_ids.get(cfg['image_tag'], 'unknown') # Query docker compose for container ids... out, err, code = yield _run_docker( ['compose', '-f', docker_compose_file, 'ps', '-q'], decode=True) if code != 0: raise RuntimeError("Could not run docker compose or could not parse " "compose.yaml file; exit code %i, error text: %s" % (code, err)) cont_ids = [line.strip() for line in out.split('\n') if line.strip() != ''] # Run docker inspect. for cont_id in cont_ids: try: info = yield _inspectContainer(cont_id, docker_compose_file) except RuntimeError as e: print(f'Warning, failed to inspect container {cont_id}; {e}.') continue if info is None: continue service = info.pop('service') if service not in summary: orphans[cont_id] = { 'compose_file': docker_compose_file, 'service': service, 'container_id': cont_id, } | info else: summary[service].update(info) return summary, orphans
@inlineCallbacks def _inspectContainer(cont_id, docker_compose_file): """Run docker inspect on cont_id, return dict with the results.""" info, err, code = yield _run_docker( ['inspect', cont_id], deyaml=True) if code != 0 and 'No such object' in err.decode('utf8'): # This is likely due to a race condition where some # container was brought down since we ran docker compose. # Return None to indicate this -- caller should just ignore for now. print(f'(_inspectContainer: warning, no such object: {cont_id}') return None elif code != 0 or len(info) != 1: raise RuntimeError( f'Trouble running "docker inspect {cont_id}".\n' f'stdout: {info}\n stderr {err}') # Reconcile config against docker compose ... info = info[0] config = info['Config']['Labels'] _dc_file = os.path.join(config['com.docker.compose.project.working_dir'], config['com.docker.compose.project.config_files']) if not os.path.samefile(docker_compose_file, _dc_file): raise RuntimeError("Consistency problem: container started from " "some other compose file?\n%s\n%s" % (docker_compose_file, _dc_file)) service = config['com.docker.compose.service'] # Note returned dict is merged into summary output for # parse_docker_state, so use keys documented there. return { 'service': service, 'running': info['State']['Running'], 'exit_code': info['State'].get('ExitCode', 127), 'container_found': True, 'running_image': info['Image'], }