Source code for ocs.agents.influxdb_publisher_v2.agent

import time
import queue
import argparse
import txaio

from os import environ

from ocs import ocs_agent, site_config
from ocs.base import OpCode

from ocs.agents.influxdb_publisher_v2.drivers import Publisher

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


[docs] class InfluxDBAgentv2: """ This class provide a WAMP wrapper for the data publisher. The run function and the data handler **are** thread-safe, as long as multiple run functions are not started at the same time, which should be prevented through OCSAgent. Args: agent (OCSAgent): OCS Agent object args (namespace): args from the function's argparser. Attributes: data_dir (path): Path to the base directory where data should be written. aggregate (bool): Specifies if the agent is currently aggregating data. incoming_data (queue.Queue): Thread-safe queue where incoming (data, feed) pairs are stored before being passed to the Publisher. loop_time (float): Time between iterations of the run loop. """ def __init__(self, agent, args): self.agent: ocs_agent.OCSAgent = agent self.log = agent.log self.args = args self.aggregate = False self.incoming_data = queue.Queue() self.loop_time = 1 self.agent.subscribe_on_start(self._enqueue_incoming_data, f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') self.agent.register_process('record', self.record, self._stop_record, startup=record_on_start) def _enqueue_incoming_data(self, _data): """Data handler for all feeds. This checks to see if the feeds should be recorded, and if they are it puts them into the incoming_data queue to be processed by the Publisher during the next run iteration. """ data, feed = _data if not feed['record'] or not self.aggregate: return self.incoming_data.put((data, feed))
[docs] @ocs_agent.param('test_mode', default=False, type=bool) def record(self, session: ocs_agent.OpSession, params): """record() **Process** - This process will create an Publisher instance, which will collect and write provider data to disk as long as this process is running. Parameters: test_mode (bool, optional): Run the record Process loop only once. This is meant only for testing. Default is False. Notes: An example of the session data:: >>> response.session['data'] {'connected': True, 'last_updated': 1774389203.53926} """ self.aggregate = True self.log.debug("Instatiating Publisher class") try: publisher = Publisher(self.incoming_data, protocol=self.args.protocol, gzip=self.args.gzip, operate_callback=lambda: self.aggregate, ) except ConnectionError: return False, "Failed to connect to InfluxDB" while self.aggregate: time.sleep(self.loop_time) self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}") publisher.run() if not publisher.connection.connected and not session.degraded: session.degraded = True self.log.error("Disconnected from InfluxDB.") if publisher.connection.connected and session.degraded: session.degraded = False self.log.error("Reconnected to InfluxDB.") data = {"connected": publisher.connection.connected, "last_updated": time.time()} session.data.update(data) if params['test_mode']: break publisher.close() return True, "Aggregation has ended"
def _stop_record(self, session, params): if OpCode(session.op_code) in [OpCode.STARTING, OpCode.RUNNING]: session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" elif OpCode(session.op_code) == OpCode.STOPPING: return True, "record process status is already 'stopping'" else: return False, "record process not currently running"
def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--initial-state', default='record', choices=['idle', 'record'], help="Initial state of argument parser. Can be either " "idle or record") pgroup.add_argument('--protocol', default='line', choices=['json', 'line'], help="Protocol for writing data. Either 'line' or " "'json'.") pgroup.add_argument('--gzip', type=bool, default=False, help="Use gzip content encoding to compress requests.") return parser def main(args=None): # Start logging txaio.start_logging(level=environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='InfluxDBAgentv2', parser=parser, args=args) agent, runner = ocs_agent.init_site_agent(args) InfluxDBAgentv2(agent, args) runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()