Adding Agent Arguments

Oftentimes you will need to pass some sort of configuration information to the Agent. For simple configurations (i.e. IP address of a device the Agent interfaces with) this is done via commandline arguments to the Agent. Within OCS the argparse module is used for this. This arguments can be written to the SCF, and will be passed to the Agent automatically at startup.

To start we will add a function to our Agent file to create and ArgumentParser. Here we define a single argument for our Agent, --mode, which allows us to select whether the Agent starts in the ‘idle’ state or in the ‘count’ state, which will cause the count Process to run immediately upon startup.

def add_agent_args(parser_in=None):
    if parser_in is None:
        from argparse import ArgumentParser as A
        parser_in = A()
    pgroup = parser_in.add_argument_group('Agent Options')
    pgroup.add_argument('--mode', type=str, default='count',
                        choices=['idle', 'count'],
                        help="Starting action for the Agent.")

    return parser_in

In the if __name__ == '__main__': block we then add:

parser = add_agent_args()
args = site_config.parse_args(agent_class='BarebonesAgent', parser=parser)

startup = False
if args.mode == 'count':
    startup = True

Lastly, we modify the registration of the count process:

agent.register_process(
    'count',
    barebone.count,
    barebone._stop_count,
    startup=startup)

We can now set the --mode argument in our SCF:

{'agent-class': 'BarebonesAgent',
 'instance-id': 'barebones1',
 'arguments': ['--mode', 'idle']},

Agent Code

Our Agent in full now looks like this:

import time
import txaio

from os import environ

from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock


class BarebonesAgent:
    """Barebone Agent demonstrating writing an Agent from scratch.

    This Agent is meant to be an example for Agent development, and provides a
    clean starting point when developing a new Agent.

    Parameters:
        agent (OCSAgent): OCSAgent object from :func:`ocs.ocs_agent.init_site_agent`.

    Attributes:
        agent (OCSAgent): OCSAgent object from :func:`ocs.ocs_agent.init_site_agent`.
        log (txaio.tx.Logger): Logger object used to log events within the
            Agent.
        lock (TimeoutLock): TimeoutLock object used to prevent simultaneous
            commands being sent to hardware.
        _count (bool): Internal tracking of whether the Agent should be
            counting or not. This is used to exit the Process loop by changing
            it to False via the count.stop() command. Your Agent won't use this
            exact attribute, but might have a similar one.

    """

    def __init__(self, agent):
        self.agent = agent
        self.log = agent.log
        self.lock = TimeoutLock(default_timeout=5)
        self._count = False

        # Register OCS feed
        agg_params = {
            'frame_length': 10 * 60  # [sec]
        }
        self.agent.register_feed('feed_name',
                                 record=True,
                                 agg_params=agg_params,
                                 buffer_time=1.)

    def count(self, session, params):
        """count(test_mode=False)

        **Process** - Count up from 0.

        The count will restart if the process is stopped and restarted.

        Notes:
            The most recent value is stored in the session data object in the
            format::

                >>> response.session['data']
                {"value": 0,
                 "timestamp":1600448753.9288929}

        """
        with self.lock.acquire_timeout(timeout=0, job='count') as acquired:
            if not acquired:
                print("Lock could not be acquired because it "
                      + f"is held by {self.lock.job}")
                return False

            # Initialize last release time for lock
            last_release = time.time()

            # Initialize the counter
            self._count = True
            counter = 0

            self.log.info("Starting the count!")

            # Main process loop
            while self._count:
                # About every second, release and acquire the lock
                if time.time() - last_release > 1.:
                    last_release = time.time()
                    if not self.lock.release_and_acquire(timeout=10):
                        print(f"Could not re-acquire lock now held by {self.lock.job}.")
                        return False

                # Perform the process actions
                counter += 1
                self.log.debug(f"{counter}! Ah! Ah! Ah!")
                now = time.time()
                session.data = {"value": counter,
                                "timestamp": now}

                # Format message for publishing to Feed
                message = {'block_name': 'count',
                           'timestamp': now,
                           'data': {'value': counter}}
                self.agent.publish_to_feed('feed_name', message)
                time.sleep(1)

        self.agent.feeds['feed_name'].flush_buffer()

        return True, 'Acquisition exited cleanly.'

    def _stop_count(self, session, params):
        """Stop monitoring the turbo output."""
        if self._count:
            self._count = False
            return True, 'requested to stop taking data.'
        else:
            return False, 'count is not currently running'

    @ocs_agent.param('text', default='hello world', type=str)
    def print(self, session, params):
        """print(text='hello world')

        **Task** - Print some text passed to a Task.

        Args:
            text (str): Text to print out. Defaults to 'hello world'.

        Notes:
            The session data will be updated with the text::

                >>> response.session['data']
                {'text': 'hello world',
                 'last_updated': 1660249321.8729222}

        """
        with self.lock.acquire_timeout(timeout=3.0, job='print') as acquired:
            if not acquired:
                self.log.warn("Lock could not be acquired because it "
                              + f"is held by {self.lock.job}")
                return False

            # Log the text provided to the Agent logs
            self.log.info(f"{params['text']}")

            # Store the text provided in session.data
            session.data = {'text': params['text'],
                            'last_updated': time.time()}

        # bool, 'descriptive text message'
        # True if task succeeds, False if not
        return True, 'Printed text to logs'


def add_agent_args(parser_in=None):
    if parser_in is None:
        from argparse import ArgumentParser as A
        parser_in = A()
    pgroup = parser_in.add_argument_group('Agent Options')
    pgroup.add_argument('--mode', type=str, default='count',
                        choices=['idle', 'count'],
                        help="Starting action for the Agent.")

    return parser_in


def main(args=None):
    # For logging
    txaio.use_twisted()
    txaio.make_logger()

    # Start logging
    txaio.start_logging(level=environ.get("LOGLEVEL", "info"))

    parser = add_agent_args()
    args = site_config.parse_args(agent_class='BarebonesAgent',
                                  parser=parser,
                                  args=args)

    startup = False
    if args.mode == 'count':
        startup = True

    agent, runner = ocs_agent.init_site_agent(args)

    barebone = BarebonesAgent(agent)
    agent.register_process(
        'count',
        barebone.count,
        barebone._stop_count,
        startup=startup)
    agent.register_task('print', barebone.print)

    runner.run(agent, auto_reconnect=True)


if __name__ == '__main__':
    main()

Running the Agent

We can now pass the --mode argument on the commandline and it will take precedent over the configuration file:

$ OCS_CONFIG_DIR=/path/to/your/ocs-site-config/ python3 barebones_agent.py --mode count
2022-07-27T01:52:11+0000 Using OCS version 0.9.3
2022-07-27T01:52:11+0000 Log directory does not exist: /home/<user>/log/ocs/
2022-07-27T01:52:11+0000 ocs: starting <class 'ocs.ocs_agent.OCSAgent'> @ observatory.barebones1
2022-07-27T01:52:11+0000 log_file is apparently None
2022-07-27T01:52:11+0000 transport connected
2022-07-27T01:52:11+0000 session joined: {'authextra': {'x_cb_node': '7aa0c07345de-1',
               'x_cb_peer': 'tcp4:172.20.0.1:41912',
               'x_cb_pid': 11,
               'x_cb_worker': 'worker001'},
 'authid': '3VRA-35PF-VWRG-GNC9-LA4E-JQT6',
 'authmethod': 'anonymous',
 'authprovider': 'static',
 'authrole': 'iocs_agent',
 'realm': 'test_realm',
 'resumable': False,
 'resume_token': None,
 'resumed': False,
 'serializer': 'msgpack.batched',
 'session': 2822082204009934,
 'transport': {'channel_framing': 'websocket',
               'channel_id': {},
               'channel_serializer': None,
               'channel_type': 'tcp',
               'http_cbtid': None,
               'http_headers_received': None,
               'http_headers_sent': None,
               'is_secure': False,
               'is_server': False,
               'own': None,
               'own_fd': -1,
               'own_pid': 36581,
               'own_tid': 36581,
               'peer': 'tcp4:127.0.0.1:8001',
               'peer_cert': None,
               'websocket_extensions_in_use': None,
               'websocket_protocol': None}}
2022-07-27T01:52:11+0000 startup-op: launching count
2022-07-27T01:52:11+0000 start called for count
2022-07-27T01:52:11+0000 count:0 Status is now "starting".
2022-07-27T01:52:11+0000 Starting the count!
2022-07-27T01:52:11+0000 count:0 Status is now "running".

For more information on site_config.parse_args() see Agent Site-related Command Line Parameters.

We are almost done! In the next step we will build a Docker image for the Agent to facilitate deploying the Agent.