Adding a Task
Functionality is added to Agents by adding Tasks and Processes. In this section
we will add a simple Task that simply prints a string passed to it. We do this
by adding a method to the BarebonesAgent
class. The method itself is shown
here:
1 @ocs_agent.param('text', default='hello world', type=str)
2 def print(self, session, params):
3 """print(text='hello world')
4
5 **Task** - Print some text passed to the Task.
6
7 Args:
8 text (str): Text to print out. Defaults to 'hello world'.
9
10 Notes:
11 The session data will be updated with the text::
12
13 >>> response.session['data']
14 {'text': 'hello world',
15 'last_updated': 1660249321.8729222}
16
17 """
18 # Print the text provided
19 print(f"{params['text']}")
20
21 # Store the text provided in session.data
22 session.data = {'text': params['text'],
23 'last_updated': time.time()}
24
25 # bool, 'descriptive text message'
26 # True if task succeeds, False if not
27 return True, 'Printed text'
Let’s look first at the method definition on line 2. We can name the task
function whatever we would like, however Tasks will always have these three
arguments, (self, session, params)
. self
, because this is a method
within a class, session
is an ocs.ocs_agent.OpSession
which is
used to track the status, post messages to the message buffer, and pass data to
clients (for more info on this last part see session.data), and
params
for all of the arguments that will be passed to the Task (even if
there are none.)
Next, we have the docstring. This generally follows the Google style for Python docstring supported by napoleon. There are also some conventions for writing Task docstrings within OCS, which are detailed on the Documentation page.
As described in the docstring, our task has a single argument, text
. OCS
provides the param
decorator, which ensures that the required parameters
are passed in and they meet the required checks specified in the decorator. For
details on how to use the decorator see Operation Parameters.
Lastly we have the body of the method, which first sets the status of the Task
to ‘running’. When the Task first starts it will automatically get the
‘starting’ status, and on completion it will have a status indicative of how
the Task completed, i.e. ‘failed’, ‘succeeded’. For other states see
ocs.base.OpCode
. Next, the code performs the main function of this
task, by printing the text passed in as a parameter. In practice your Agent
will do something much more useful here. The text is then stored in the
session.data
object. Again, for more info on using session.data
, see
session.data. Lastly, the Task returns, always returning a boolean, with
a string that provides a message that will be inserted into the session message
buffer.
Before a Client on the network can call our Task we must first register the Task with the crossbar server. This is commonly done outside the Agent after the Agent is first instantiated. By convention (and for the documentation to look good) we will keep the method name the same as the name we use to register the Task with crossbar.
agent.register_task('print', barebone.print)
Aborting a Task
‘print’ is a very short Task that runs very quickly, however if we have a long running Task, we might need the ability to stop it before it would normally complete. OCS supports aborting a Task, however this mechanism needs to be implemented within the Agent code. This will require adding an aborter function, which typically will look like this:
def _abort_print(self, session, params):
if session.status == 'running':
session.set_status('stopping')
Within the Task function, at points that are reasonable to request an abort,
you must add a check of the session.status
that then exits the Task with an
error (i.e. returns False
) if the status is no longer running. For example:
if session.status != 'running':
return False, 'Aborted print'
Where you insert this interrupt code will vary from Agent to Agent. Tasks that run quickly do not need an abort to be implemented at all. However, for long running Tasks abort should be implemented. (We will see this interruption implementation again in the next step where we discuss Adding a Process.)
When registering the Task, the aborter must be specified:
agent.register_task('print', barebone.print, aborter=barebone._abort_print)
Note
By default the aborter will run in the same threading pattern as the task.
If your Task runs in the main reactor (i.e. is decorated with
@inlineCallbacks
), then the aborter should also run in the reactor, and so
needs to yield
at the end of the method. In our example this would look
like:
@inlineCallbacks
def _abort_print(self, session, params):
if session.status == 'running':
session.set_status('stopping')
yield
Again, since ‘print’ runs quickly, we do not implement an aborter for it here.
For an example of an abortable task, see
ocs.agents.fake_data.agent.FakeDataAgent.delay_task()
.
Agent Code
Our full Agent so far should look like this:
import time
from ocs import ocs_agent, site_config
class BarebonesAgent:
"""Barebone Agent demonstrating writing an Agent from scratch.
This Agent is meant to be an example for Agent development, and provides a
clean starting point when developing a new Agent.
Parameters:
agent (OCSAgent): OCSAgent object from :func:`ocs.ocs_agent.init_site_agent`.
Attributes:
agent (OCSAgent): OCSAgent object from :func:`ocs.ocs_agent.init_site_agent`.
"""
def __init__(self, agent):
self.agent = agent
@ocs_agent.param('text', default='hello world', type=str)
def print(self, session, params):
"""print(text='hello world')
**Task** - Print some text passed to the Task.
Args:
text (str): Text to print out. Defaults to 'hello world'.
Notes:
The session data will be updated with the text::
>>> response.session['data']
{'text': 'hello world',
'last_updated': 1660249321.8729222}
"""
# Print the text provided
print(f"{params['text']}")
# Store the text provided in session.data
session.data = {'text': params['text'],
'last_updated': time.time()}
# bool, 'descriptive text message'
# True if task succeeds, False if not
return True, 'Printed text'
def main(args=None)
args = site_config.parse_args(agent_class='BarebonesAgent', args=args)
agent, runner = ocs_agent.init_site_agent(args)
barebone = BarebonesAgent(agent)
agent.register_task('print', barebone.print)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()
Running the Agent
We can now run our Agent and interact with it via a Client. First, start the Agent:
$ OCS_CONFIG_DIR=/path/to/your/ocs-site-config/ ocs-agent-cli --agent barebones_agent.py --entrypoint main --instance-id barebones1
Args: ['--instance-id', 'barebones1']
2022-07-22T10:55:46-0400 Using OCS version 0.9.3+3.gfc30f3d.dirty
2022-07-22T10:55:46-0400 ocs: starting <class 'ocs.ocs_agent.OCSAgent'> @ observatory.barebones1
2022-07-22T10:55:46-0400 log_file is apparently None
2022-07-22T10:55:46-0400 transport connected
2022-07-22T10:55:46-0400 session joined:
SessionDetails(realm="test_realm",
session=3109556471169828,
authid="RJ9J-EP5Y-LP5H-RSWC-GCLW-LSRJ",
authrole="iocs_agent",
authmethod="anonymous",
authprovider="static",
authextra={'x_cb_node': '7eedf90409d6-1', 'x_cb_worker': 'worker001', 'x_cb_peer': 'tcp4:192.168.240.1:33046', 'x_cb_pid': 16},
serializer="cbor.batched",
transport="websocket",
resumed=None,
resumable=None,
resume_token=None)
Next, run a Client, calling the print task:
$ OCS_CONFIG_DIR=/path/to/your/ocs-site-config/ python3
Python 3.10.5 (main, Jun 6 2022, 18:49:26) [GCC 12.1.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from ocs.ocs_client import OCSClient
>>> client = OCSClient('barebones1')
>>> client.print.start()
OCSReply: OK : Started task "print".
print[session=0]; status=starting for 0.003074 s
messages (1 of 1):
1658501763.027 Status is now "starting".
other keys in .session: op_code, data
>>> client.print.status()
OCSReply: OK : Session active.
print[session=0]; status=done without error 3.7 s ago, took 0.001974 s
messages (4 of 4):
1658501763.027 Status is now "starting".
1658501763.028 Status is now "running".
1658501763.029 Printed text
1658501763.029 Status is now "done".
other keys in .session: op_code, data
In the terminal running your Agent you should see:
2022-07-22T10:56:03-0400 start called for print
2022-07-22T10:56:03-0400 print:0 Status is now "starting".
2022-07-22T10:56:03-0400 hello world
2022-07-22T10:56:03-0400 print:0 Status is now "running".
2022-07-22T10:56:03-0400 print:0 Printed text
2022-07-22T10:56:03-0400 print:0 Status is now "done".