Source code for ocs.ocs_feed

from ocs.ocs_agent import in_reactor_context
from twisted.internet import reactor
from autobahn.exception import Disconnected
from autobahn.wamp.exception import TransportLost
import time
import re


[docs] class Block: def __init__(self, name, keys): """ Structure of block for a so3g IrregBlockDouble. """ self.name = name self.timestamps = [] self.tags = None self.data = { k: [] for k in keys }
[docs] def empty(self): """ Returns true if block is empty""" return self.timestamps == []
[docs] def clear(self): """ Empties block's buffers """ self.timestamps = [] for key in self.data: self.data[key] = []
[docs] def append(self, d): """ Adds a single data point to the block """ if d['data'].keys() != self.data.keys(): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.append(d['timestamp']) tags = d.get('influxdb_tags') if tags is not None: self.tags = d['influxdb_tags'] for k in self.data: self.data[k].append(d['data'][k])
[docs] def extend(self, block): """ Extends the data block by an encoded block """ if block['data'].keys() != self.data.keys(): raise Exception("Block structure does not match: {}".format(self.name)) self.timestamps.extend(block['timestamps']) tags = block.get('influxdb_tags') if tags is not None: self.tags = block['influxdb_tags'] for k in self.data: self.data[k].extend(block['data'][k])
[docs] def encoded(self): n = len(self.timestamps) assert (all([n == len(v) for v in self.data.values()])) return { 'block_name': self.name, 'data': {k: self.data[k] for k in self.data.keys()}, 'influxdb_tags': self.tags, 'timestamps': self.timestamps, }
[docs] class Feed: """ Manages publishing to a specific feed and storing of messages. Args: agent (OCSAgent): agent that is registering the feed feed_name (string): name of the feed record (bool, optional): Determines if feed should be aggregated. At the moment, each agent can have at most one aggregated feed. Defaults to False agg_params (dict, optional): Parameters used by the aggregator and influx publisher. Params: **frame_length** (float): Deterimes the amount of time each G3Frame should be (in seconds). **fresh_time** (float): Time until feed is considered stale by aggregator. **exclude_aggregator** (bool): If True, the HK Aggregator will not record feed to g3. **exclude_influx** (bool): If True, the InfluxPublisher will not write the feed to Influx. buffer_time (int, optional): Specifies time that messages should be buffered in seconds. If 0, message will be published immediately. Defaults to 0. max_messages (int, optional): Max number of messages stored. Defaults to 20. """ def __init__(self, agent, feed_name, record=False, agg_params={}, buffer_time=0, max_messages=0): self.agent = agent self.feed_name = feed_name self.record = record self.agg_params = agg_params self.buffer_time = buffer_time self.buffer_start_time = None self.blocks = {} self.agent_address = self.agent.agent_address self.address = "{}.feeds.{}".format(self.agent_address, self.feed_name)
[docs] def encoded(self): return { "agent_address": self.agent_address, "agg_params": self.agg_params, "feed_name": self.feed_name, "address": self.address, "record": self.record, "session_id": self.agent.agent_session_id, "agent_class": self.agent.class_name }
[docs] def flush_buffer(self): """Publishes all messages in buffer and empties it.""" if not in_reactor_context(): return reactor.callFromThread(self.flush_buffer) if self.buffer_start_time is None: return if self.record: try: self.agent.publish(self.address, ( {k: b.encoded() for k, b in self.blocks.items() if b.timestamps}, self.encoded() )) except (Disconnected, TransportLost): self.agent.log.error('Could not publish to Feed. ' + 'crossbar server likely unreachable.') for k, b in self.blocks.items(): b.clear()
[docs] def publish_message(self, message, timestamp=None): """ Publishes message to feed. If this is an aggregatable feed (record=True), then it may be buffered. Otherwise it is dispatched immediately. Args: message: Data to be published (see notes about acceptable formats). timestamp (float): timestamp given to the message. Defaults to time.time() If this feed is not intended to provide structured data for aggregation, then the format of the message is unrestricted as long as it is WAMP-serializable. For aggregated feeds, the message should be a dict with one of the following formats: 1. A single sample for several co-sampled channels. The structure is:: message = { 'block_name': Key given to the block in blocking param 'timestamp': timestamp of data 'data': { key1: datapoint1 key2: datapoint2 } } Samples recorded in this way may be buffered, if self.sample_time > 0. 2. Multiple or more samples for several co-sampled channels. The structure is:: message = { 'block_name': Key given to the block in blocking param 'timestamps': [timestamp, timestamp...] 'data': { key1: [datapoint, datapoint...] key2: [datapoint, datapoint...] } } Note that the code distinguishes between these cases based on the presence of the key 'timestamps' rather than 'timestamp'. These data can be buffered, too, if self.sample_time > 0. """ current_time = time.time() if timestamp is None: timestamp = current_time if not in_reactor_context(): # Take a copy, for thread-safety. message = message.copy() return reactor.callFromThread(self.publish_message, message, timestamp=timestamp) if self.record: # check message contents for k, v in message['data'].items(): Feed.verify_data_field_string(k) Feed.verify_message_data_type(v) # check influxdb_tags Feed.verify_influxdb_tags(message) # Data is stored in Block objects block_name = message['block_name'] try: b = self.blocks[block_name] except KeyError: b = Block(block_name, message['data'].keys()) self.blocks[block_name] = b if 'timestamp' in message: b.append(message) elif 'timestamps' in message: b.extend(message) else: raise RuntimeError('Invalid message when record=True. keys=%s' % message.keys()) if self.buffer_start_time is None: self.buffer_start_time = current_time if (current_time - self.buffer_start_time) >= self.buffer_time: self.flush_buffer() self.buffer_start_time = None else: # Publish message immediately try: self.agent.publish(self.address, (message, self.encoded())) except TransportLost: self.agent.log.error('Could not publish to Feed. TransportLost. ' + 'crossbar server likely unreachable.')
[docs] @staticmethod def verify_influxdb_tags(message): """Check the 'influxdb_tags' to make sure all the needed information is provided. This checks to make sure each tag has a '_field' provided, and that each field has a corresponding tag. Note: 'influxdb_tags' can be ``None``. This will be the case for any agents that do not implement tagging. Args: message (dict): 'message' dictionary value published (see Feed.publish_message for details). Raises: ValueError: If the 'influxdb_tags' provided in the message do not meet the required format. """ tags = message.get('influxdb_tags') if tags is None: return # check '_field' supplied with each tag for v in tags.values(): if '_field' not in v: error_msg = f"'_field' not supplied with 'influxdb_tags' for tag set {v}" raise ValueError(error_msg) # check that all fields have a corresponding tag tag_fields = tags.keys() for k in message['data'].keys(): if k not in tag_fields: error_msg = f"'influxdb_tags' does not contain tags for '{k}'" raise ValueError(error_msg) # check for extra tags for k in tag_fields: if k not in message['data'].keys(): error_msg = f"'influxdb_tags' contains extra tag '{k}'" raise ValueError(error_msg)
[docs] @staticmethod def verify_message_data_type(value): """Aggregated Feeds can only store certain types of data. Here we check that the type of all data contained in a message's 'data' dictionary are supported types. Args: value (list, float, int, bool): 'data' dictionary value published (see Feed.publish_message for details). """ valid_types = (float, int, str, bool) # multi-sample check if isinstance(value, list): if not all(isinstance(x, valid_types) for x in value): type_set = set([type(x) for x in value]) invalid_types = type_set.difference(valid_types) raise TypeError("message 'data' block contains invalid data" + f"types: {invalid_types}") # single sample check else: if not isinstance(value, valid_types): invalid_type = type(value) raise TypeError("message 'data' block contains invalid " + f"data type: {invalid_type}")
[docs] @staticmethod def verify_data_field_string(field): """There are strict rules for the characters allowed in field names. This function verifies the names in a message are valid. A valid name: * contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_). * begins with a letter, or with any number of underscores followed by a letter. * is at least one, but no more than 255, character(s) long. Args: field (str): Field name string to verify. Returns: bool: True if field name is valid. Raises: ValueError: If field name is invalid. """ # Check for empty field name if not field: raise ValueError("Empty field name encountered, please enter " + "a valid field name.") # Complement (^) the set, matching any unlisted characters check_invalid = re.compile('[^a-zA-Z0-9_]') # Similar to check_invalid, search for non letter characters # Leading ^ matches the start of string, so following numbers are valid check_start = re.compile('^[^a-zA-Z]') # check for invalid characters result = check_invalid.search(field) if result: raise ValueError(f"message 'data' block contains the key {field} " f"with the invalid character '{result.group(0)}'. " "Valid characters are a-z, A-Z, 0-9, and underscore.") # check for non-letter start, even after underscores stripped_key = field.strip("_") if check_start.search(stripped_key): raise ValueError(f"message 'data' block contains the key {field}, " + "which does not start with a letter (after any " + "number of leading underscores.)") # check key length if len(field) > 255: raise ValueError(f"message 'data' block contains key {field} which " + "exceeds the valid length of 255 characters.") return True
[docs] @staticmethod def enforce_field_name_rules(field_name): """Enforce naming rules for field names. A valid name: * contains only letters (a-z, A-Z; case sensitive), decimal digits (0-9), and the underscore (_). * begins with a letter, or with any number of underscores followed by a letter. * is at least one, but no more than 255, character(s) long. Args: field_name (str): Field name string to check and modify if needed. Returns: str: New field name, meeting all above rules. Note this isn't guarenteed to not collide with other field names passed through this method, and that should be checked. """ # check for empty string if field_name == "": new_field_name = "invalid_field" else: new_field_name = field_name # replace spaces with underscores new_field_name = new_field_name.replace(' ', '_') # replace invalid characters new_field_name = re.sub('[^a-zA-Z0-9_]', '', new_field_name) # grab leading underscores underscore_search = re.compile('^_*') underscores = underscore_search.search(new_field_name).group() # remove leading underscores new_field_name = re.sub('^_*', '', new_field_name) # remove leading non-letters new_field_name = re.sub('^[^a-zA-Z]*', '', new_field_name) # add underscores back new_field_name = underscores + new_field_name # limit to 255 characters new_field_name = new_field_name[:255] return new_field_name