import os
import binascii
import time
from typing import Dict
import txaio
txaio.use_twisted()
from ocs.ocs_feed import Block, Feed
import so3g
from spt3g import core
HKAGG_VERSION = 2
_g3_casts = {
str: core.G3String,
int: core.G3Int,
float: core.G3Double,
bool: core.G3Bool,
}
_g3_list_casts = {
str: core.G3VectorString,
int: core.G3VectorInt,
float: core.G3VectorDouble,
bool: core.G3VectorBool,
}
LOG = txaio.make_logger()
[docs]
def g3_cast(data, time=False):
"""
Casts a generic datatype into a corresponding G3 type. With:
int -> G3Int
str -> G3String
float -> G3Double
bool -> G3Bool
and lists of type X will go to G3VectorX. If ``time`` is set to True, will
convert to G3Time or G3VectorTime with the assumption that ``data`` consists
of unix timestamps.
Args:
data (int, str, float, or list):
Generic data to be converted to a corresponding G3Type.
time (bool, optional):
If True, will assume data contains unix timestamps and try to cast
to G3Time or G3VectorTime.
Returns:
g3_data:
Corresponding G3 datatype.
"""
is_list = isinstance(data, list)
if is_list:
dtype = type(data[0])
if not all(isinstance(d, dtype) for d in data):
raise TypeError("Data list contains varying types!")
else:
dtype = type(data)
if dtype not in _g3_casts.keys():
raise TypeError("g3_cast does not support type {}. Type must "
"be one of {}".format(dtype, _g3_casts.keys()))
if is_list:
if time:
return core.G3VectorTime(list(map(
lambda t: core.G3Time(t * core.G3Units.s), data)))
else:
cast = _g3_list_casts[type(data[0])]
return cast(data)
else:
if time:
return core.G3Time(data * core.G3Units.s)
else:
cast = _g3_casts[type(data)]
return cast(data)
[docs]
def generate_id(hksess):
"""
Generates a unique session id based on the start_time, process_id,
and hksess description.
Args:
hksess (so3g.HKSessionHelper)
"""
# Maybe this should go directly into HKSessionHelper
elements = [
(int(hksess.start_time), 32),
(os.getpid(), 14),
(binascii.crc32(bytes(hksess.description, 'utf8')), 14)
]
agg_session_id = 0
for i, b in elements:
agg_session_id = (agg_session_id << b) | (i % (1 << b))
return agg_session_id
[docs]
def make_filename(base_dir, make_subdirs=True):
"""
Creates a new filename based on the time and base_dir.
If make_subdirs is True, all subdirectories will be automatically created.
I don't think there's any reason that this shouldn't be true...
Args:
base_dir (path):
Base path where data should be written.
make_subdirs (bool):
True if func should automatically create non-existing subdirs.
"""
start_time = time.time()
subdir = os.path.join(base_dir, "{:.5}".format(str(start_time)))
if not os.path.exists(subdir):
if make_subdirs:
try:
os.makedirs(subdir)
except PermissionError as e:
LOG.error("Permission error encountered while trying to create "
f"data sub-directory: {e}")
raise e
else:
raise FileNotFoundError("Subdir {} does not exist"
.format(subdir))
time_string = int(start_time)
filename = os.path.join(subdir, "{}.g3".format(time_string))
return filename
[docs]
class Provider:
"""
Stores data for a single provider (OCS Feed).
This class should only be accessed via a single thread.
Args:
addresss (string):
Full address of the provider
sessid (string):
session_id of the provider
prov_id (bool):
id assigned to the provider by the HKSessionHelper
frame_length (float, optional):
Time (in seconds) before data should be written into a frame. Defaults to 5 min.
fresh_time (float, optional):
Time (in seconds) before provider should be considered stale. Defaults to 3 min.
Attributes:
blocks (dict):
All blocks that are written by provider.
frame_start_time (float):
Start time of current frame
fresh_time (float):
time (in seconds) that the provider can go without data before being
labeled stale, and scheduled to be removed
last_refresh (time):
Time when the provider was last refreshed (either through data or
agent heartbeat).
last_block_received (str):
String of the last block_name received.
log (txaio.Logger):
txaio logger
"""
def __init__(self, address, sessid, prov_id, frame_length=5 * 60, fresh_time=3 * 60):
self.address = address
self.sessid = sessid
self.frame_length = frame_length
self.prov_id = prov_id
self.log = txaio.make_logger()
self.blocks = {}
# When set to True, provider will be written and removed next agg cycle
self.frame_start_time = None
self.fresh_time = fresh_time
self.last_refresh = time.time() # Determines if
self.last_block_received = None
[docs]
def encoded(self):
return {
'last_refresh': self.last_refresh,
'sessid': self.sessid,
'stale': self.stale(),
'last_block_received': self.last_block_received
}
[docs]
def refresh(self):
"""Refresh provider"""
self.last_refresh = time.time()
[docs]
def stale(self):
"""Returns true if provider is stale and should be removed"""
return (time.time() - self.last_refresh) > self.fresh_time
[docs]
def new_frame_time(self):
"""Returns true if its time for a new frame to be written"""
if self.frame_start_time is None:
return False
return (time.time() - self.frame_start_time) > self.frame_length
[docs]
def empty(self):
"""Returns true if all blocks are empty"""
for _, b in self.blocks.items():
if not b.empty():
return False
return True
def _verify_provider_data(self, data):
"""Check the provider data for invalid field names. Meant to be used in
combination with Provider._rebuild_invalid_data().
Args:
data (dict): data dictionary passed to Provider.save_to_block()
Returns:
bool: True if all field names valid. False if any invalid names found
"""
verified = True
for block_name, block_dict in data.items():
for field_name, field_values in block_dict['data'].items():
try:
Feed.verify_data_field_string(field_name)
except ValueError:
self.log.error("data field name '{field}' is "
+ "invalid, removing invalid characters.",
field=field_name)
verified = False
return verified
@staticmethod
def _check_for_duplicate_names(field_name, name_list):
"""Check name_list for matching field names and modify field_name if
matches are found.
The results of ocs_feed.Feed.enforce_field_name_rules() are not guarenteed
to be unique. This method will check field_name against a list of
existing field names and try to append '_N', with N being a zero padded
integer up to 99. Longer integers, though not expected to see use, are
also supported, though will not be zero padded.
In the event the field name is at the maximum allowed length, we remove
some characters before appending the additional underscore and integer.
Examples:
>>> current_field_names = ['test', 'test_01']
>>> name = 'test'
>>> new_name = Provider._check_for_duplicate_names(name, current_field_names)
>>> print(new_name)
test_02
Args:
field_name (str): field name to check against name_lsit
name_list (list): list of field names already in a Block
Returns:
str: A new field name that is not already in name_list
"""
name_index = 1
while field_name in name_list:
suffix = f'_{name_index:02}'
suf_len = len(suffix)
field_name = field_name[:255 - suf_len] + suffix
name_index += 1
return field_name
def _rebuild_invalid_data(self, data):
"""Rebuild an invalid data dictionary.
Args:
data (dict): data dictionary passed to Provider.save_to_block().
Returns:
dict: A rebuilt data dictionary with invalid characters stripped
from the field names, limited to 255 characters in length.
"""
new_data = {}
for block_name, block_dict in data.items():
new_data[block_name] = {}
# rebuild block_dict
for k, v in block_dict.items():
if k == 'data':
new_data[block_name]['data'] = {}
new_field_names = []
for field_name, field_values in block_dict['data'].items():
new_field_name = Feed.enforce_field_name_rules(field_name)
# Catch instance where rule enforcement strips all characters
if not new_field_name:
new_field_name = Feed.enforce_field_name_rules("invalid_field_" + field_name)
new_field_name = Provider._check_for_duplicate_names(new_field_name,
new_field_names)
new_data[block_name]['data'][new_field_name] = field_values
new_field_names.append(new_field_name)
else:
new_data[block_name][k] = v
return new_data
[docs]
def save_to_block(self, data):
"""Saves a list of data points into blocks. A block will be created
for any new block_name.
Examples:
The format of data is shown in the following example:
>>> data = {'test': {'block_name': 'test',
'timestamps': [time.time()],
'data': {'key1': [1],
'key2': [2]},
}
}
>>> prov.save_to_block(data)
Note the block name shows up twice, once as the dict key in the
outer data dictionary, and again under the 'block_name' value.
These must match -- in this instance both the word 'test'.
Args:
data (dict): data dictionary from incoming data queue
"""
self.refresh()
if self.frame_start_time is None:
# Get min frame time out of all blocks
self.frame_start_time = time.time()
for _, b in data.items():
if b['timestamps']:
self.frame_start_time = min(self.frame_start_time, b['timestamps'][0])
self.log.debug('data passed to block: {d}', d=data)
verified = self._verify_provider_data(data)
if not verified:
self.log.info('rebuilding data containing invalid field name')
data = self._rebuild_invalid_data(data)
self.log.debug('data after rebuild: {d}', d=data)
for key, block in data.items():
try:
b = self.blocks[key]
except KeyError:
self.blocks[key] = Block(
key, block['data'].keys(),
)
b = self.blocks[key]
b.extend(block)
self.last_block_received = key
[docs]
def clear(self):
"""Clears all blocks and resets the frame_start_time"""
for _, b in self.blocks.items():
b.clear()
self.frame_start_time = None
[docs]
def to_frame(self, hksess=None, clear=False):
"""
Returns a G3Frame based on the provider's blocks.
Args:
hksess (optional):
If provided, the frame will be based off of hksession's data frame.
If the data will be put into a clean frame.
clear (bool):
Clears provider data if True.
"""
if hksess is not None:
frame = hksess.data_frame(prov_id=self.prov_id)
else:
frame = core.G3Frame(core.G3FrameType.Housekeeping)
frame['address'] = self.address
frame['provider_session_id'] = self.sessid
block_names = []
for block_name, block in self.blocks.items():
if not block.timestamps:
continue
try:
m = core.G3TimesampleMap()
m.times = g3_cast(block.timestamps, time=True)
for key, ts in block.data.items():
m[key] = g3_cast(ts)
except Exception as e:
self.log.warn("Error received when casting timestream! {e}",
e=e)
continue
frame['blocks'].append(m)
block_names.append(block_name)
if 'block_names' in frame:
frame['block_names'].extend(block_names)
else:
frame['block_names'] = core.G3VectorString(block_names)
if clear:
self.clear()
return frame
[docs]
class G3FileRotator(core.G3Module):
"""
G3 module which handles file rotation.
After time_per_file has elapsed, the rotator will end that file and create
a new file with the `filename` function.
It will write the last_session and last_status frame to any new file if they
exist.
This class should only be accessed via a single thread.
Args:
time_per_file (float):
time (seconds) before a new file should be written
filename (callable):
function that generates new filenames.
Attributes:
filename (function):
Function to call to create new filename on rotation
file_start_time (int):
Start time for current file
writer (core.G3Writer):
G3Writer object for current file. None if no file is open.
last_session (core.G3Frame):
Last session frame written to disk. This is stored and written as
the first frame on file rotation.
last_status (core.G3Frame):
Last status frame written to disk. Stored and written as the second
frame on file rotation.
current_file (str, optional):
Path to the current file being written.
"""
def __init__(self, time_per_file, filename):
self.time_per_file = time_per_file
self.filename = filename
self.log = txaio.make_logger()
self.file_start_time = None
self.writer = None
self.last_session = None
self.last_status = None
self.current_file = None
[docs]
def close_file(self):
if self.writer is not None:
self.writer(core.G3Frame(core.G3FrameType.EndProcessing))
self.writer = None
[docs]
def flush(self):
"""Flushes current g3 file to disk"""
if self.writer is not None:
self.writer.Flush()
[docs]
def Process(self, frames):
"""
Writes frame to current file. If file has not been started
or time_per_file has elapsed, file is closed and a new file is created
by `filename` function passed to constructor
"""
for frame in frames:
ftype = frame['hkagg_type']
if ftype == so3g.HKFrameType.session:
self.last_session = frame
elif ftype == so3g.HKFrameType.status:
self.last_status = frame
if self.writer is None:
self.current_file = self.filename()
self.log.info("Creating file: {}".format(self.current_file))
self.writer = core.G3Writer(self.current_file)
self.file_start_time = time.time()
if ftype in [so3g.HKFrameType.data, so3g.HKFrameType.status]:
if self.last_session is not None:
self.writer(self.last_session)
if ftype == so3g.HKFrameType.data:
if self.last_status is not None:
self.writer(self.last_status)
self.writer(frame)
if (time.time() - self.file_start_time) > self.time_per_file:
self.close_file()
return frames
[docs]
class Aggregator:
"""Data aggregator. This manages a collection of providers, and contains
methods to write them to disk.
This class should only be accessed by a single thread. Data can be passed
to it by appending it to the referenced `incoming_data` queue.
Args:
incoming_data (queue.Queue):
A thread-safe queue of (data, feed) pairs.
time_per_file (float):
Time (sec) before a new file should be written to disk.
data_dir (path):
Base directory for new files to be written.
session (OpSession, optional):
Session object of current agent process. If not specified, session
data will not be written.
Attributes:
log (txaio.Logger):
txaio logger
hksess (so3g.HKSessionHelper):
HKSession helper that assigns provider id's to providers,
and constructs so3g frames.
writer (G3Module):
Module to use to write frames to disk.
providers (Dict[Provider]):
dictionary of active providers, indexed by the hksess's assigned
provider id.
pids (Dict[Int]):
Dictionary of provider id's assigned by the hksession. Indexed
by (prov address, session_id).
write_status (bool):
If true, a status frame will be written next time providers are
written to disk. This is set to True whenever a provider is added
or removed.
"""
def __init__(self, incoming_data, time_per_file, data_dir, session=None):
self.log = txaio.make_logger()
self.hksess = so3g.hk.HKSessionHelper(description="HK data",
hkagg_version=HKAGG_VERSION)
self.hksess.start_time = time.time()
self.hksess.session_id = generate_id(self.hksess)
self.incoming_data = incoming_data
self.writer = G3FileRotator(
time_per_file,
lambda: make_filename(data_dir, make_subdirs=True),
)
self.writer.Process([self.hksess.session_frame()])
self.providers: Dict[Provider] = {} # by prov_id
self.pids = {} # By (address, sessid)
self.provider_archive: Dict[Provider] = {}
self.write_status = False
self.session = session
[docs]
def process_incoming_data(self):
"""
Takes all data from the incoming_data queue, and puts them into
provider blocks.
"""
while not self.incoming_data.empty():
data, feed = self.incoming_data.get()
agg_params = feed['agg_params']
if agg_params.get('exclude_aggregator', False):
continue
address = feed['address']
sessid = feed['session_id']
pid = self.pids.get((address, sessid))
if pid is None:
prov_kwargs = {}
for key in ['frame_length', 'fresh_time']:
if key in agg_params:
prov_kwargs[key] = agg_params[key]
pid = self.add_provider(address, sessid, **prov_kwargs)
prov = self.providers[pid]
prov.save_to_block(data)
[docs]
def add_provider(self, prov_address, prov_sessid, **prov_kwargs):
"""
Registers a new provider and writes a status frame.
Args:
prov_address (str):
full address of provider
prov_sessid (str):
session id of provider
Optional Arguments:
Additional kwargs are passed directly to the Provider constructor,
so defaults are set there.
"""
pid = self.hksess.add_provider(description=prov_address)
self.providers[pid] = Provider(
prov_address, prov_sessid, pid, **prov_kwargs
)
self.provider_archive[prov_address] = self.providers[pid]
self.log.info("Adding provider {}".format(prov_address))
self.pids[(prov_address, prov_sessid)] = pid
self.write_status = True
return pid
[docs]
def remove_provider(self, prov):
"""
Writes remaining provider data to frame and removes provider.
Args:
prov (Provider):
provider object that should be removed.
"""
pid = prov.prov_id
addr, sessid = prov.address, prov.sessid
if not prov.empty():
self.writer.Process([prov.to_frame(self.hksess, clear=False)])
self.log.info("Removing provider {}".format(prov.address))
self.hksess.remove_provider(pid)
del self.providers[pid]
del self.pids[(addr, sessid)]
self.write_status = True
[docs]
def remove_stale_providers(self):
"""
Loops through all providers and check if they've gone stale. If they
have, write their remaining data to disk (they shouldn't have any)
and delete them.
"""
stale_provs = []
for pid, prov in self.providers.items():
if prov.stale():
self.log.info("Provider {} went stale".format(prov.address))
stale_provs.append(prov)
for prov in stale_provs:
self.remove_provider(prov)
[docs]
def write_to_disk(self, clear=True, write_all=False):
"""
Loop through all providers, and write their data to the frame_queue
if they have surpassed their frame_time, or if write_all is True.
Args:
clear (bool):
If True, provider data is cleared after write
write_all (bool):
If true all providers are written to disk regardless of whether
frame_time has passed.
"""
frames = []
if self.write_status:
frames.append(self.hksess.status_frame())
self.write_status = False
for pid, prov in self.providers.items():
if prov.empty():
continue
if write_all or prov.new_frame_time():
frames.append(prov.to_frame(self.hksess, clear=clear))
self.writer.Process(frames)
[docs]
def run(self):
"""
Main run iterator for the aggregator. This processes all incoming data,
removes stale providers, and writes active providers to disk.
"""
self.process_incoming_data()
self.remove_stale_providers()
self.write_to_disk()
self.writer.flush()
if self.session is not None:
self.session.data = {
'current_file': self.writer.current_file,
'providers': {}
}
for addr, prov in self.provider_archive.items():
self.session.data['providers'][addr] = prov.encoded()
[docs]
def close(self):
"""Flushes all remaining providers and closes file."""
self.write_to_disk(write_all=True)
self.writer.close_file()