Source code for ocs.agents.host_manager.drivers

import os
import time
import yaml

from twisted.internet import reactor, utils, protocol
from twisted.internet.defer import inlineCallbacks


[docs] class ManagedInstance(dict): """Track properties of a managed Agent-instance. This is just a dict with a schema docstring and an "init" function to set defaults. Properties that must be set explicitly by user: - 'management' (str): Either 'host', 'docker', or 'retired'. - 'agent_class' (str): The agent class name, which may include a suffix ([d] or [d?]) if the agent is managed through Docker. For instances corresponding to docker services that do not have a corresponding SCF entry, the value here will be '[docker]'. - 'instance_id' (str): The agent instance-id, or the docker service name if the instance is an unmatched docker service. - 'full_name' (str): agent_class:instance_id Properties that are given a default value by init function: - 'operable' (bool): indicates whether the instance can be manipulated (whether calls to up/down should be expected to work). - 'agent_script' (str): The docker service_name, if docker-managed. Otherwise, the string ``__plugin__`` to indicate it is host managed. - 'prot': The twisted ProcessProtocol object (if host system managed), or the DockerContainerHelper (if a docker container). - 'target_state' (state): The state we're trying to achieve (up or down). - 'next_action' (state): The thing HostManager needs to do next; this will sometimes indicate the "current state" (up or down), but sometimes it will carry a transitional state, such as "wait_start". - 'at' (float): a unix timestamp for transitional states (e.g. used to set how long to wait for something). - 'fail_times' (list of floats): unix timestamps when the instance process has stopped unexpectedly (used to identify "unstable" agents). """
[docs] @classmethod def init(cls, **kwargs): # Note some core things are not included. self = cls({ 'agent_script': None, 'operable': False, 'prot': None, 'next_action': 'down', 'target_state': 'down', 'fail_times': [], 'at': 0, }) self.update(kwargs) return self
[docs] def resolve_child_state(db): """Args: db (ManagedInstance): the instance state information. This will be modified in place. Returns: Dict with important actions for caller to take. Content is: - 'messages' (list of str): messages for the session. - 'launch' (bool): whether to launch a new instance. - 'terminate' (bool): whether to terminate the instance. - 'sleep' (float): maximum delay before checking back, or None if this machine doesn't care. """ actions = { 'launch': False, 'terminate': False, 'sleep': None, } messages = [] sleeps = [] # State machine. prot = db['prot'] # If the entry is not "operable", send next_action to '?' and # don't try to do anything else. if not db['operable']: db['next_action'] = '?' # The uninterruptible transition state(s) are most easily handled # in the same way regardless of target state. # Transitional: wait_start, which bridges from start -> up. elif db['next_action'] == 'wait_start': if prot is not None: messages.append('Launched {full_name}'.format(**db)) db['next_action'] = 'up' else: if time.time() >= db['at']: messages.append('Launch not detected for ' '{full_name}! Will retry.'.format(**db)) db['next_action'] = 'start_at' db['at'] = time.time() + 5. # Transitional: wait_dead, which bridges from kill -> idle. elif db['next_action'] == 'wait_dead': if prot is None: stat, t = 0, None else: stat, t = prot.status if stat is not None: db['next_action'] = 'down' elif time.time() >= db['at']: if stat is None: messages.append('Agent instance {full_name} ' 'refused to die.'.format(**db)) db['next_action'] = 'down' else: sleeps.append(db['at'] - time.time()) # State handling when target is to be 'up'. elif db['target_state'] == 'up': if db['next_action'] == 'start_at': if time.time() >= db['at']: db['next_action'] = 'start' else: sleeps.append(db['at'] - time.time()) elif db['next_action'] == 'start': messages.append( 'Requested launch for {full_name}'.format(**db)) actions['launch'] = True db['next_action'] = 'wait_start' now = time.time() db['at'] = now + 1. elif db['next_action'] == 'up': if prot is None: stat, t = 0, None else: stat, t = prot.status if stat is not None: messages.append('Detected exit of {full_name} ' 'with code {stat}.'.format(stat=stat, **db)) if hasattr(prot, 'lines'): note = '' lines = prot.lines['stderr'] if len(lines) > 50: note = ' (trimmed)' lines = lines[-20:] messages.append('stderr output from {full_name}{note}: {}' .format('\n'.join(lines), note=note, **db)) db['next_action'] = 'start_at' db['at'] = time.time() + 3 db['fail_times'].append(time.time()) else: # 'down' db['next_action'] = 'start' # State handling when target is to be 'down'. elif db['target_state'] == 'down': if db['next_action'] == 'down': # The lines below will prevent HostManager from killing # Agents that suddenly seem to be alive. With these # lines commented out, someone running "up" on a managed # docker-compose.yaml will see their Agents immediately # be brought down by HostManager. # if prot is not None and prot.status[0] is None: # messages.append('Detected unexpected session for {full_name} ' # '(probably docker); changing target state to "up".'.format(**db)) # db['target_state'] = 'up' # In fully managed mode, force a termination. if prot is not None and prot.status[0] is None: messages.append('Detected unexpected session for {full_name} ' '(probably docker); it will be shut down.'.format(**db)) db['next_action'] = 'up' elif db['next_action'] == 'up': messages.append('Requesting termination of ' '{full_name}'.format(**db)) actions['terminate'] = True db['next_action'] = 'wait_dead' db['at'] = time.time() + 5 else: # 'start_at', 'start' messages.append('Modifying state of {full_name} from ' '{next_action} to idle'.format(**db)) db['next_action'] = 'down' # Should not get here. else: messages.append( 'State machine failure: state={next_action}, target_state' '={target_state}'.format(**db)) actions['messages'] = messages if len(sleeps): actions['sleep'] = min(sleeps) return actions
[docs] def stability_factor(times, window=120): """Given an increasing list of failure times, quantify the stability of the activity. A single failure, 10 seconds in the past, has a stability factor of 0.5; if there were additional failures before that, the stability factor will be lower. Returns a culled list of stop times and a stability factor (0 - 1). """ now = time.time() if len(times) == 0: return times, 1. # Only keep the last few failures, within our time window. times = [t for t in times[-200:-1] if t >= now - window] + times[-1:] dt = [5. / (now - t) for t in times] return times, max(1 - sum(dt), 0.)
[docs] class AgentProcessHelper(protocol.ProcessProtocol): def __init__(self, instance_id, cmd): super().__init__() self.status = None, None self.killed = False self.instance_id = instance_id self.cmd = cmd self.lines = {'stderr': [], 'stdout': []}
[docs] def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ)
[docs] def down(self): self.killed = True # race condition, but it could be worse. if self.status[0] is None: reactor.callFromThread(self.transport.signalProcess, 'INT')
# See https://twistedmatrix.com/documents/current/core/howto/process.html # # These notes, and the useless prototypes below them, are to get # us started when we come back here later to feed the process # output to high level logging somehow. # # In a successful launch, we see: # - connectionMade (at which point we closeStdin) # - inConnectionLost (which is then expected) # - childDataReceived(counter, message), output from the script. # - later, when process exits: processExited(status). Status is some # kind of object that knows the return code... # In a failed launch, it's the same except note that: # - The childDataReceived message contains the python traceback, on, # e.g. realm error. +1 - Informative. # - The processExited(status) knows the return code was not 0. # # Note that you implement childDataReceived instead of # "outReceived" and "errReceived".
[docs] def connectionMade(self): self.transport.closeStdin()
[docs] def inConnectionLost(self): pass
[docs] def processExited(self, status): # print('%s.status:' % self.instance_id, status) self.status = status, time.time()
[docs] def outReceived(self, data): self.lines['stdout'].extend(data.decode('utf8').split('\n')) if len(self.lines['stdout']) > 100: self.lines['stdout'] = self.lines['stdout'][-100:]
[docs] def errReceived(self, data): self.lines['stderr'].extend(data.decode('utf8').split('\n')) if len(self.lines['stderr']) > 100: self.lines['stderr'] = self.lines['stderr'][-100:]
def _run_docker(args): return utils.getProcessOutputAndValue( 'docker', args, env=os.environ)
[docs] class DockerContainerHelper: """Class for managing the docker container associated with some service. Provides some of the same interface as AgentProcessHelper. Pass in a service description dict (such as the ones returned by parse_docker_state). """ def __init__(self, service, docker_bin=None): self.service = {} self.status = -1, time.time() self.killed = False self.instance_id = service['service'] self.d = None self.update(service)
[docs] def update(self, service): """Update self.status based on service info (in format returned by parse_docker_state). """ self.service.update(service) if service['running']: self.status = None, time.time() else: self.status = service['exit_code'], time.time() self.killed = False
[docs] def up(self): self.d = _run_docker( ['compose', '-f', self.service['compose_file'], 'up', '-d', self.service['service']]) self.status = None, time.time()
[docs] def down(self): self.d = _run_docker( ['compose', '-f', self.service['compose_file'], 'rm', '--stop', '--force', self.service['service']]) self.killed = True
[docs] @inlineCallbacks def parse_docker_state(docker_compose_file): """Analyze a docker compose.yaml file to get a list of services. Using docker compose ps and docker inspect, determine whether each service is running or not. Returns: A dict where the key is the service name and each value is a dict with the following entries: - 'compose_file': the path to the docker compose file - 'service': service name - 'container_found': bool, indicates whether a container for this service was found (whether or not it was running). - 'running': bool, indicating that a container for this service is currently in state "Running". - 'exit_code': int, which is either extracted from the docker inspect output or is set to 127. (This should never be None.) """ summary = {} compose = yaml.safe_load(open(docker_compose_file, 'r')) for key, cfg in compose.get('services', []).items(): summary[key] = { 'service': key, 'running': False, 'exit_code': 127, 'container_found': False, 'compose_file': docker_compose_file, } # Query docker compose for container ids... out, err, code = yield _run_docker( ['compose', '-f', docker_compose_file, 'ps', '-q']) if code != 0: raise RuntimeError("Could not run docker compose or could not parse " "compose.yaml file; exit code %i, error text: %s" % (code, err)) cont_ids = [line.strip() for line in out.decode('utf8').split('\n') if line.strip() != ''] # Run docker inspect. for cont_id in cont_ids: try: info = yield _inspectContainer(cont_id, docker_compose_file) except RuntimeError as e: print(f'Warning, failed to inspect container {cont_id}; {e}.') continue if info is None: continue service = info.pop('service') if service not in summary: raise RuntimeError("Consistency problem: image does not self-report " "as a listed service? (%s)" % (service)) summary[service].update(info) return summary
@inlineCallbacks def _inspectContainer(cont_id, docker_compose_file): """Run docker inspect on cont_id, return dict with the results.""" out, err, code = yield _run_docker( ['inspect', cont_id]) if code != 0 and 'No such object' in err.decode('utf8'): # This is likely due to a race condition where some # container was brought down since we ran docker compose. # Return None to indicate this -- caller should just ignore for now. print(f'(_inspectContainer: warning, no such object: {cont_id}') return None elif code != 0: raise RuntimeError( f'Trouble running "docker inspect {cont_id}".\n' f'stdout: {out}\n stderr {err}') # Reconcile config against docker compose ... info = yaml.safe_load(out)[0] config = info['Config']['Labels'] _dc_file = os.path.join(config['com.docker.compose.project.working_dir'], config['com.docker.compose.project.config_files']) if not os.path.samefile(docker_compose_file, _dc_file): raise RuntimeError("Consistency problem: container started from " "some other compose file?\n%s\n%s" % (docker_compose_file, _dc_file)) service = config['com.docker.compose.service'] return { 'service': service, 'running': info['State']['Running'], 'exit_code': info['State'].get('ExitCode', 127), 'container_found': True, }