Source code for ocs.client_http

# The crossbar router must be configured to expose http access.
import json
import requests


[docs] class ControlClientError(RuntimeError): pass
[docs] class ControlClient(): def __init__(self, agent_addr, **kwargs): self.agent_addr = agent_addr self.realm = kwargs['realm'] self.call_url = kwargs['url'] # start and stop are just to imitate wampy client...
[docs] def start(self, *args, **kwargs): pass
[docs] def stop(self, *args, **kwargs): pass
[docs] def call(self, procedure, *args, **kwargs): # curl -H "Content-Type: application/json" # -d '{"procedure": "observatory.acu1", # "args": ["get_tasks"]}' # http://127.0.0.1:8001/call params = json.dumps({'procedure': procedure, 'args': args, 'kwargs': kwargs}) try: r = requests.post(self.call_url, data=params) except requests.exceptions.ConnectionError: raise ControlClientError([0, 0, 0, 0, 'client_http.error.connection_error', ['Failed to connect to %s' % self.call_url], {}]) if r.status_code != 200: raise ControlClientError([0, 0, 0, 0, 'client_http.error.request_error', ['Server replied with code %i' % r.status_code], {}]) decoded = r.json() if 'error' in decoded: # Return errors in the same way wampy does, roughly. raise ControlClientError([0, 0, 0, 0, decoded['error'], decoded['args'], decoded['kwargs']]) return decoded['args'][0]
[docs] def get_api(self, simple=False): """Query the API and other info from the Agent; this includes lists of Processes, Tasks, and Feeds, docstrings, operation session structures, and info about the Agent instance (class, PID, host). Args: simple (bool): If True, then return just the lists of the op and feed names without accompanying detail. Returns: A dict, see :func:`ocs.ocs_agent.OCSAgent._management_handler` for detail. """ data = self.call(self.agent_addr, 'get_api') if not simple: return data return {k: [_v[0] for _v in v] for k, v in data.items() if isinstance(v, dict)}
[docs] def get_tasks(self): """ Query the list of Tasks from the Agent management interface. Returns a list of items of the form (task_name, info_dict). """ return self.call(self.agent_addr, 'get_tasks')
[docs] def get_processes(self): """ Query the list of Processes from the Agent management interface. Returns a list of items of the form (process_name, info_dict). """ return self.call(self.agent_addr, 'get_processes')
[docs] def get_feeds(self): """ Query the list of Feeds from the Agent management interface. Returns a list of items of the form (feed_name, info_dict). """ return self.call(self.agent_addr, 'get_feeds')
[docs] def request(self, action, op_name, params={}, **kw): """ Issue a request on an Agent's .ops interface. Args: action (string): The action name (start, status, etc). params (dict): Parameters to pass to the action. Returns: Tuple (status, message, session). """ return self.call(self.agent_addr + '.ops', action, op_name, params, **kw)
[docs] def special(self, subaddr, *args, **kwargs): """Execute a an arbitrary method associated with an Agent. This is intended for use with special, centralized services, such as to communicate with the Access Director. """ return self.call(self.agent_addr + '.' + subaddr, *args, **kwargs)