Source code for ocs.agents.aggregator.agent

import time
import queue
import argparse
import txaio

from twisted.internet import reactor

from os import environ
from ocs import ocs_agent, site_config
from ocs.base import OpCode

from ocs.agents.aggregator.drivers import Aggregator

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


[docs] class AggregatorAgent: """ This class provide a WAMP wrapper for the data aggregator. The run function and the data handler **are** thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent. Args: agent (OCSAgent): OCS Agent object args (namespace): args from the function's argparser. Attributes: time_per_file (int): Time (sec) before files should be rotated. data_dir (path): Path to the base directory where data should be written. aggregate (bool): Specifies if the agent is currently aggregating data. incoming_data (queue.Queue): Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Aggregator. loop_time (float): Time between iterations of the run loop. """ def __init__(self, agent, args): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.time_per_file = int(args.time_per_file) self.data_dir = args.data_dir self.aggregate = False self.incoming_data = queue.Queue() self.loop_time = 1 # SUBSCRIBES TO ALL FEEDS!!!! # If this ends up being too much data, we can add a tag '.record' # at the end of the address of recorded feeds, and filter by that. self.agent.subscribe_on_start(self._enqueue_incoming_data, f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') self.agent.register_process('record', self.record, self._stop_record, startup=record_on_start) def _enqueue_incoming_data(self, _data): """ Data handler for all feeds. This checks to see if the feeds should be recorded, and if they are it puts them into the incoming_data queue to be processed by the Aggregator during the next run iteration. """ data, feed = _data if not feed['record'] or not self.aggregate: return self.incoming_data.put((data, feed)) self.log.debug("Enqueued {d} from Feed {f}", d=data, f=feed)
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def record(self, session: ocs_agent.OpSession, params): """record(test_mode=False) **Process** - This process will create an Aggregator instance, which will collect and write provider data to disk as long as this process is running. Parameters: test_mode (bool, optional): Run the record Process loop only once. This is meant only for testing. Default is False. Notes: The most recent file and active providers will be returned in the session data:: >>> response.session['data'] {"current_file": "/data/16020/1602089117.g3", "providers": { "observatory.fake-data1.feeds.false_temperatures": { "last_refresh": 1602089118.8225083, "sessid": "1602088928.8294137", "stale": false, "last_block_received": "temps"}, "observatory.LSSIM.feeds.temperatures": { "last_refresh": 1602089118.8223345, "sessid": "1602088932.335811", "stale": false, "last_block_received": "temps"}}} """ self.aggregate = True try: aggregator = Aggregator( self.incoming_data, self.time_per_file, self.data_dir, session=session ) except PermissionError: self.log.error("Unable to intialize Aggregator due to permission " "error, stopping twisted reactor") reactor.callFromThread(reactor.stop) return False, "Aggregation not started" while self.aggregate: time.sleep(self.loop_time) aggregator.run() if params['test_mode']: break aggregator.close() return True, "Aggregation has ended"
def _stop_record(self, session, params): if OpCode(session.op_code) in [OpCode.STARTING, OpCode.RUNNING]: session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" elif OpCode(session.op_code) == OpCode.STOPPING: return True, "record process status is already 'stopping'" else: return False, "record process not currently running"
[docs] def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--data-dir', required=True, help="Base directory to store data. " "Subdirectories will be made here.") pgroup.add_argument('--initial-state', default='idle', choices=['idle', 'record'], help="Initial state of argument parser. Can be either" "idle or record") pgroup.add_argument('--time-per-file', default='3600', help="Time per file in seconds. Defaults to 1 hr") return parser
[docs] def main(args=None): # Start logging txaio.start_logging(level=environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='AggregatorAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) AggregatorAgent(agent, args) runner.run(agent, auto_reconnect=True)
if __name__ == '__main__': main()