Source code for ocs.agents.influxdb_publisher.agent

import time
import queue
import argparse
import txaio

from os import environ

from ocs import ocs_agent, site_config
from ocs.base import OpCode

from ocs.agents.influxdb_publisher.drivers import Publisher

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


[docs] class InfluxDBAgent: """ This class provide a WAMP wrapper for the data publisher. The run function and the data handler **are** thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent. Args: agent (OCSAgent): OCS Agent object args (namespace): args from the function's argparser. Attributes: data_dir (path): Path to the base directory where data should be written. aggregate (bool): Specifies if the agent is currently aggregating data. incoming_data (queue.Queue): Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher. loop_time (float): Time between iterations of the run loop. """ def __init__(self, agent, args): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.args = args self.aggregate = False self.incoming_data = queue.Queue() self.loop_time = 1 self.agent.subscribe_on_start(self._enqueue_incoming_data, f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') self.agent.register_process('record', self.record, self._stop_record, startup=record_on_start) def _enqueue_incoming_data(self, _data): """Data handler for all feeds. This checks to see if the feeds should be recorded, and if they are it puts them into the incoming_data queue to be processed by the Publisher during the next run iteration. """ data, feed = _data if not feed['record'] or not self.aggregate: return # LOG.debug("data: {d}", d=data) # LOG.debug("feed: {f}", f=feed) self.incoming_data.put((data, feed))
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def record(self, session: ocs_agent.OpSession, params): """record() **Process** - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running. Parameters: test_mode (bool, optional): Run the record Process loop only once. This is meant only for testing. Default is False. Notes: An example of the session data:: >>> response.session['data'] {'connected': True, 'last_updated': 1774389203.53926} """ self.aggregate = True self.log.debug("Instatiating Publisher class") try: publisher = Publisher(self.args.host, self.args.database, self.incoming_data, port=self.args.port, protocol=self.args.protocol, ssl=self.args.ssl, verify_ssl=self.args.verify_ssl, gzip=self.args.gzip, operate_callback=lambda: self.aggregate, ) except ConnectionError: return False, "Failed to connect to InfluxDB" while self.aggregate: time.sleep(self.loop_time) self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}") publisher.run() if not publisher.connected and not session.degraded: session.degraded = True if publisher.connected and session.degraded: session.degraded = False data = {"connected": publisher.connected, "last_updated": time.time()} session.data.update(data) if params['test_mode']: break publisher.close() return True, "Aggregation has ended"
def _stop_record(self, session, params): if OpCode(session.op_code) in [OpCode.STARTING, OpCode.RUNNING]: session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" elif OpCode(session.op_code) == OpCode.STOPPING: return True, "record process status is already 'stopping'" else: return False, "record process not currently running"
[docs] def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--initial-state', default='record', choices=['idle', 'record'], help="Initial state of argument parser. Can be either " "idle or record") pgroup.add_argument('--host', default='influxdb', help="InfluxDB host address.") pgroup.add_argument('--port', default=8086, help="InfluxDB port.") pgroup.add_argument('--database', default='ocs_feeds', help="Database within InfluxDB to publish data to.") pgroup.add_argument('--protocol', default='line', choices=['json', 'line'], help="Protocol for writing data. Either 'line' or " "'json'.") pgroup.add_argument('--ssl', type=bool, default=False, help="Use https instead of http to connect to InfluxDB, defaults to False.") pgroup.add_argument('--verify-ssl', type=bool, default=False, help="Verify SSL certificates for HTTPS requests, defaults to False.") pgroup.add_argument('--gzip', type=bool, default=False, help="Use gzip content encoding to compress requests.") return parser
[docs] def main(args=None): # Start logging txaio.start_logging(level=environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='InfluxDBAgent', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) InfluxDBAgent(agent, args) runner.run(agent, auto_reconnect=True)
if __name__ == '__main__': main()