from dataclasses import dataclass
from datetime import datetime, timezone
[docs]
def timestamp2influxtime(time, protocol):
"""Convert timestamp for influx, always in UTC.
Args:
time:
Unix 'ctime' timestamp, i.e. ``1775500953.5108523``
protocol:
InfluxDB protocol to format timestamp for. Either 'json' or line'.
"""
if protocol == 'json':
t_dt = datetime.fromtimestamp(time)
# InfluxDB expects UTC by default
t_dt = t_dt.astimezone(tz=timezone.utc)
influx_t = t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")
elif protocol == 'line':
influx_t = int(time * 1e9) # ns
return influx_t
def _format_field_line(field_key, field_value):
"""Format key-value pair for InfluxDB line protocol."""
# Strings must be in quotes for line protocol
if isinstance(field_value, str):
line = f'{field_key}="{field_value}"'
else:
line = f"{field_key}={field_value}"
# Don't append 'i' to bool, which is a subclass of int
if isinstance(field_value, int) and not isinstance(field_value, bool):
line += "i"
return line
[docs]
@dataclass
class InfluxBlock:
"""Holds and can convert the data and feed information into a format
suitable for publishing to InfluxDB.
"""
#: OCS Block name.
block_name: str
#: OCS data, as it comes across the feed.
data: dict
#: Corresponding timestamps for the data.
timestamps: list
#: Measurement name to publish to in InfluxDB.
measurement: str
#: Tags to apply to the measurements.
tags: InfluxTags
def _group_data(self):
"""Takes the block structured data and groups each data point in a set
of fields so they can be combined with the corresponding timestamp.
Example:
This takes something of the form::
{'channel_00': [-0.0943, -0.0965],
'channel_01': [-0.0082, -0.0086]}
and shapes it into the form::
[{'channel_00': -0.0943, 'channel_01': -0.0082},
{'channel_00': -0.0965, 'channel_01': -0.0086}]
This is the form needed to pass directly to the 'json' format.
"""
grouped_data = []
for i in range(len(self.timestamps)):
grouped_dict = {}
for k, v in self.data.items():
grouped_dict[k] = v[i]
grouped_data.append(grouped_dict)
return grouped_data
def _group_fields_lines(self):
"""Takes the block structured data and groups each data point into a
set of fields formatted for the 'line' protocol that can be combined
with the corresponding timestamp.
Example:
This takes something of the form::
{'channel_00': [-0.1170, -0.1180, -0.1153],
'channel_01': [-0.0241, -0.0267, -0.0226]}
and shapes it into the form::
['channel_00=-0.1170,channel_01=-0.0241',
'channel_00=-0.1180,channel_01=-0.0267',
'channel_00=-0.1153,channel_01=-0.0226']
This is the form needed to pass directly to the 'line' format.
"""
grouped_data = self._group_data()
fields_lines = []
for fields in grouped_data:
fields_line = []
for mk, mv in fields.items():
f_line = _format_field_line(mk, mv)
fields_line.append(f_line)
field_line = ','.join(fields_line)
fields_lines.append(field_line)
return fields_lines
def _encode_line(self, fields, timestamp):
"""Given the fields and timestamps, encode them for use in the 'line'
protocol.
Args:
fields (str): Comma separated list of fields.
timestamp (float): Unix timestamp.
Returns:
str: Complete 'line' protocol string.
Example:
An example of the formatting::
>>> block._encode_line('channel_00=-0.0341,channel_01=0.0612', 1775162753.7786078)
observatory.fake-data1,feed=false_temperatures channel_00=-0.0341,channel_01=0.0612 1775162753778607872
"""
# Convert json format tags to line format
tag_list = []
for k, v in self.tags.shared_tags.items():
tag_list.append(f'{k}={v}')
# Add unique field tags to the list and overwrite the field key
if self.tags.field_tags:
field_name = fields.split('=')[0]
tags_to_add = self.tags.field_tags.get(field_name)
for k, v in tags_to_add.items():
if k == '_field':
continue
tag_list.append(f'{k}={v}')
# Overwrite field name with _field from tags_to_add (influxdb_tags)
new_field_key = tags_to_add.get('_field')
field_value = fields.split('=')[1]
fields = f'{new_field_key}={field_value}'
tags = ','.join(tag_list)
try:
t_influx = timestamp2influxtime(timestamp, protocol='line')
except OverflowError:
print(f"Warning: Cannot convert {timestamp} to an InfluxDB compatible time. "
+ "Dropping this data point.")
return
line = f"{self.measurement},{tags} {fields} {t_influx}"
return line
def _encode_json(self, fields, timestamp):
"""Given the fields and timestamps, encode them for use in the 'json'
protocol.
Args:
fields (dict): Dictionary with the fields and their associated values.
timestamp (float): Unix timestamp.
Returns:
dict: Complete 'json' protocol dict.
Example:
An example of the formatting::
>>> block._encode_json({'channel_00': -0.1149, 'channel_01': -0.0038}, 1775163570.7786078)
{'measurement': 'observatory.fake-data1',
'time': '2026-04-02T20:59:30.778608',
'fields': {'channel_00': -0.1149, 'channel_01': -0.0038},
'tags': {'feed': 'false_temperatures'}}
"""
# Add unique field tags to the list and overwrite the field key
if self.tags.field_tags:
(field_name, field_value), = fields.items() # Unpack single (k, v)
tags_to_add = self.tags.field_tags.get(field_name)
tags = {}
for k, v in tags_to_add.items():
if k == '_field':
continue
tags[k] = v
tags.update(self.tags.shared_tags)
# Overwrite field name with _field from tags_to_add (influxdb_tags)
new_field_key = tags_to_add.get('_field')
fields = {new_field_key: field_value}
else:
tags = self.tags.shared_tags
try:
t_influx = timestamp2influxtime(timestamp, protocol='json')
except OverflowError:
print(f"Warning: Cannot convert {timestamp} to an InfluxDB compatible time. "
+ "Dropping this data point.")
return
json = {
"measurement": self.measurement,
"time": t_influx,
"fields": fields,
"tags": tags,
}
return json
[docs]
def encode(self, protocol='line'):
"""Encode Block data into InfluxDB compatible format for the given
protocol.
Args:
protocol (str): Protocol to use to publish to InfluxDB. Either
'line' or 'json'. Defaults to 'line'.
Returns:
list:
List of encoded data points, formatted to be used by the
InfluxDB client.
"""
encoded_list = []
if protocol == 'line':
# If we don't have unique field tags, group the fields together
if self.tags.field_tags is None:
fields_lines = self._group_fields_lines()
for fields, time_ in zip(fields_lines, self.timestamps):
line = self._encode_line(fields, time_)
if line is not None:
encoded_list.append(line)
# If we do have field_tags, encode each line separately
else:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
for (field, value) in fields.items():
f_line = _format_field_line(field, value)
line = self._encode_line(f_line, time_)
if line is not None:
encoded_list.append(line)
elif protocol == 'json':
# If we don't have unique field tags, group the fields together
if self.tags.field_tags is None:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
text = self._encode_json(fields, time_)
if text is not None:
encoded_list.append(text)
# If we do have field_tags, encode each line separately
else:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
for (field, value) in fields.items():
single_field_dict = {field: value}
text = self._encode_json(single_field_dict, time_)
if text is not None:
encoded_list.append(text)
else:
print(f"Protocol '{protocol}' not supported.")
return encoded_list
def _convert_single_to_group(message):
"""Convert a single sample 'timestamp' message to the co-sampled group
'timestamps' format, which we can handle already.
Args:
message (dict): Single sample 'timestamp' message from the OCS feed.
Notes:
This doesn't take the data directly from the feed, but the block dict
only.
"""
message['timestamps'] = [message['timestamp']]
new_data = {}
for field, value in message['data'].items():
new_data[field] = [value]
message['data'] = new_data
message.pop('timestamp')
return message