Source code for ocs.ocs_client

import collections
import time

import ocs
from ocs import site_config, access

import logging
logger = logging.getLogger(__name__)


[docs] def _get_op(op_type, name, encoded, client, password): """Factory for generating matched operations. This will make sure op.start's docstring is the docstring of the operation. Parameters: op_type (str): Operation type, either 'task' or 'process'. name (str): Operation name encoded (dict): Encoded :class:`ocs.ocs_agent.AgentTask` or :class:`ocs.ocs_agent.AgentProcess` dictionary. client (ControlClient): Client object, which will be used to issue the requests for operation actions. password (str): Client-supplied password, or None to not use a password. """ # It's important to not pass password='', if the agent doesn't # support password API. feature_kw = {} if password is not None and password != '': feature_kw['password'] = password class MatchedOp: def start(self, **kwargs): return OCSReply(*client.request('start', name, params=kwargs, **feature_kw)) def wait(self, timeout=None): return OCSReply(*client.request('wait', name, timeout=timeout, **feature_kw)) def status(self): return OCSReply(*client.request('status', name, **feature_kw)) class MatchedTask(MatchedOp): def abort(self): return OCSReply(*client.request('abort', name, **feature_kw)) def __call__(self, **kw): """Runs self.start(**kw) and, if that succeeds, self.wait().""" result = self.start(**kw) if result[0] != ocs.OK: return result return self.wait() class MatchedProcess(MatchedOp): def stop(self): return OCSReply(*client.request('stop', name, **feature_kw)) def __call__(self): """Equivalent to self.status().""" return self.status() MatchedOp.start.__doc__ = encoded['docstring'] if op_type == 'task': return MatchedTask() elif op_type == 'process': return MatchedProcess() else: raise ValueError("op_type must be either 'task' or 'process'")
def _opname_to_attr(name): for c in ['-', ' ']: name = name.replace(c, '_') return name
[docs] class OCSClient: """The simple OCS Client, facilitating task/process calls. OCSClient makes an Agent's tasks/processes available as class attributes, making it easy to setup a client instance and call the associated Agent's tasks and processes. Example: This example sets up an OCSClient object and calls a FakeDataAgent's Task (delay_task) and process (acq):: >>> client = OCSClient('fake-data-1') >>> client.delay_task(delay=5) >>> client.acq.start() Attributes: instance_id (str): instance-id for agent to run """ def __init__(self, instance_id, privs=None, **kwargs): """ Args: instance_id (str): Instance id for agent to run privs (str or int): If str, a password to use for all interactions with Agent operations. If int, the system will check the password configuration for a password that can yield that credential level, and apply it. args (list or args object, optional): Takes in the parser arguments for the client. If None, pass an empty list. If list, reads in list elements as arguments. Defaults to None. .. note:: For additional ``**kwargs`` see site_config.get_control_client. .. note:: For details on ``privs`` processing, see :func:`ocs.access.client_get_password`. """ if kwargs.get('args') is None: kwargs['args'] = [] self._client = site_config.get_control_client(instance_id, **kwargs) self.instance_id = instance_id self._api = self._client.get_api() # If target agent doesn't support AC, don't pass password arg. use_password = (self._api.get('access_control') is not None) self._password = access.client_get_password( privs, self._api['agent_class'], instance_id) if self._password not in [None, ''] and not use_password: logger.warning('WARNING: client wants to pass a password to ' 'agent instance, but it does not support passwords.') self._password = None for name, _, encoded in self._api['tasks']: setattr(self, _opname_to_attr(name), _get_op('task', name, encoded, self._client, self._password)) for name, _, encoded in self._api['processes']: setattr(self, _opname_to_attr(name), _get_op('process', name, encoded, self._client, self._password)) def __repr__(self): return f"OCSClient('{self.instance_id}')"
def _humanized_time(t): if abs(t) < 1.: return '%.6f s' % t if abs(t) < 120: return '%.1f s' % t if abs(t) < 120 * 60: return '%.1f mins' % (t / 60) if abs(t) < 48 * 3600: return '%.1f hrs' % (t / 3600) return '%.1f days' % (t / 86400)
[docs] class OCSReply(collections.namedtuple('_OCSReply', ['status', 'msg', 'session'])): def __repr__(self): try: ok_str = ocs.ResponseCode(self.status).name except ValueError: ok_str = '???' text = 'OCSReply: %s : %s\n' % (ok_str, self.msg) if self.session is None or len(self.session.keys()) == 0: return text + ' (no session -- op has never run)' # try/fail in here so we can make assumptions about key # presence and bail out to a full dump if anything is weird. try: handled = ['op_name', 'session_id', 'status', 'start_time', 'end_time', 'messages', 'success'] s = self.session run_str = 'status={status}'.format(**s) if s['status'] in ['starting', 'running']: run_str += ' for %s' % _humanized_time( time.time() - s['start_time']) elif s['status'] == 'done': if s['success']: run_str += ' without error' else: run_str += ' with ERROR' run_str += ' %s ago, took %s' % ( _humanized_time(time.time() - s['end_time']), _humanized_time(s['end_time'] - s['start_time'])) text += (' {op_name}[session={session_id}]; ' '{run_str}\n'.format(run_str=run_str, **s)) messages = s.get('messages', []) if len(messages): to_show = min(5, len(messages)) text += (' messages (%i of %i):\n' % (to_show, len(messages))) for m in messages: text += ' %.3f %s\n' % (m[0], m[1]) also = [k for k in s.keys() if k not in handled] if len(also): text += (' other keys in .session: ' + ', '.join(also)) except Exception as e: text += ('\n [session decode failed with exception: %s\n' ' Here is everything in .session:\n %s\n]') \ % (e.args, self.session) return text