import ocs
import txaio
txaio.use_twisted()
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.error import ReactorNotRunning
from autobahn.wamp.types import ComponentConfig
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
import deprecation
[docs]
@deprecation.deprecated(
deprecated_in='v0.6.1',
details="Renamed to run_control_script"
)
def run_control_script2(*args, **kwargs):
run_control_script(*args, **kwargs)
[docs]
def run_control_script(function, parser=None, *args, **kwargs):
"""
Run a function in a ControlClientSession, within a site_config
paradigm, assuming that additional configuration will be passed
through command line arguments.
Args:
function: The function to call.
parser: argparse.ArgumentParser, with control script options
pre-loaded. If None, this will be created internally in
the usual way.
The command line is parsed and site configuration information is
loaded. The WAMP server information is used to configure the
Control Client's connection before launching the function. The
function is invoked as::
function(app, parser_args, *args, **kwargs)
where app is the ControlClientSession and parser_args is the
argparse.Namespace object including processing done by
ocs.site_config. Note that the observatory root_address is
contained in parser_args.root_address. Any additional arguments
defined in the parser will also be present.
This function can be invoked with parser=None, or else with a
parser that has been initialized for control client purposes, like
this::
from ocs import client_t, site_config
parser = site_control.add_arguments() # initialized ArgParser
parser.add_option('--target') # Options for this client
client_t.run_control_script2(my_script, parser=parser)
In the my_script function, use parser_args.target to get the
target.
"""
if parser is None:
parser = ocs.site_config.add_arguments()
pargs = parser.parse_args()
ocs.site_config.reparse_args(pargs, '*control*')
server, realm = pargs.site_hub, pargs.site_realm
session = ControlClientSession(ComponentConfig(realm, {}), function,
[pargs] + list(args), kwargs)
runner = ApplicationRunner(server, realm)
runner.run(session, auto_reconnect=True)
[docs]
class ControlClientSession(ApplicationSession):
def __init__(self, config, script, script_args, script_kwargs):
ApplicationSession.__init__(self, config)
self.script = (script, script_args, script_kwargs)
[docs]
def onConnect(self):
self.log.info('transport connected')
self.join(self.config.realm)
[docs]
def onChallenge(self, challenge):
self.log.info('authentication challenge received')
# The Operations API...
[docs]
def onLeave(self, details):
self.log.info('session left: {}'.format(details))
self.disconnect()
[docs]
def onDisconnect(self):
self.log.info('transport disconnected')
try:
reactor.stop()
except ReactorNotRunning:
pass
[docs]
@inlineCallbacks
def onJoin(self, details):
self.log.info('session joined: {}'.format(details))
script, a, kw = self.script
yield from script(self, *a, **kw)
yield self.leave()
[docs]
class OperationClient:
"""The endpoint client is associated with a single operation rather
than with an operation server."""
def __init__(self, app, root_address, op_name):
self.app = app
self.root = root_address
self.op_name = op_name
[docs]
def request(self, action, params=None, timeout=None):
return self.app.call(self.root + '.ops', action, self.op_name,
params=params, timeout=timeout)
[docs]
def status(self, params=None):
return self.request('status', params=params)
[docs]
def start(self, params=None):
return self.request('start', params=params)
[docs]
def wait(self, params=None, timeout=None):
return self.request('wait', params=params, timeout=timeout)
[docs]
class TaskClient(OperationClient):
[docs]
def abort(self, params=None):
return self.request('abort', params=params)
[docs]
class ProcessClient(OperationClient):
[docs]
def stop(self, params=None):
return self.request('stop', params=params)