from ocs import ocs_agent, site_config, ocs_feed
import time
import threading
import txaio
from os import environ
import numpy as np
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep as dsleep
# For logging
txaio.use_twisted()
LOG = txaio.make_logger()
[docs]
class FakeDataAgent:
def __init__(self, agent,
num_channels=2,
sample_rate=10.,
frame_length=60):
self.agent = agent
self.log = agent.log
self.lock = threading.Semaphore()
self.job = None
self.channel_names = ['channel_%02i' % i for i in range(num_channels)]
self.sample_rate = max(1e-6, sample_rate) # nozeros
# Register feed
agg_params = {
'frame_length': frame_length
}
print('registering')
self.agent.register_feed('false_temperatures',
record=True,
agg_params=agg_params,
buffer_time=1.)
# Exclusive access management.
[docs]
def try_set_job(self, job_name):
with self.lock:
if self.job is None:
self.job = job_name
return True, 'ok.'
return False, 'Conflict: "%s" is already running.' % self.job
[docs]
def set_job_done(self):
with self.lock:
self.job = None
# Process functions.
[docs]
@ocs_agent.param('test_mode', default=False, type=bool)
@ocs_agent.param('degradation_period', default=None, type=float)
def acq(self, session, params):
"""acq(test_mode=False, degradation_period=None)
**Process** - Acquire data and write to the feed.
Parameters:
test_mode (bool, optional): Run the acq Process loop only once.
This is meant only for testing. Default is False.
degradation_period (float, optional): If set, then
alternately mark self as degraded / not degraded with
this period (in seconds).
Notes:
The most recent fake values are stored in the session data object in
the format::
>>> response.session['data']
{"fields":
{"channel_00": 0.10250430068515494,
"channel_01": 0.08550903376216404,
"channel_02": 0.10481891991693446,
"channel_03": 0.10793263271024509},
"timestamp":1600448753.9288929}
The channels kept in fields are the 'faked' data, in a similar
structure to the Lakeshore agents. 'timestamp' is the last time
these values were updated.
"""
ok, msg = self.try_set_job('acq')
if not ok:
return ok, msg
T = [.100 for c in self.channel_names]
block = ocs_feed.Block('temps', self.channel_names)
next_timestamp = time.time()
reporting_interval = 1.
next_report = next_timestamp + reporting_interval
next_deg_flip = None
if params['degradation_period'] is not None:
next_deg_flip = 0
self.log.info("Starting acquisition")
while True:
with self.lock:
if self.job == '!acq':
break
elif self.job == 'acq':
pass
else:
return 10
now = time.time()
if next_deg_flip is not None and now > next_deg_flip:
session.degraded = not session.degraded
next_deg_flip = now + params['degradation_period']
delay_time = next_report - now
if delay_time > 0:
time.sleep(min(delay_time, 1.))
continue
# Safety: if we ever get waaaay behind, reset.
if delay_time / reporting_interval < -3:
self.log.info('Got way behind in reporting: %.1s seconds. '
'Dropping fake data.' % delay_time)
next_timestamp = now
next_report = next_timestamp + reporting_interval
continue
# Pretend we got it exactly.
n_data = int((next_report - next_timestamp) * self.sample_rate)
# Set the next report time, before checking n_data.
next_report += reporting_interval
# This is to handle the (acceptable) case of sample_rate < 0.
if (n_data <= 0):
time.sleep(.1)
continue
# New data bundle.
t = next_timestamp + np.arange(n_data) / self.sample_rate
block.timestamps = list(t)
# Unnecessary realism: 1/f.
T = [_t + np.random.uniform(-1, 1) * .003 for _t in T]
for _t, _c in zip(T, self.channel_names):
block.data[_c] = list(_t + np.random.uniform(
-1, 1, size=len(t)) * .002)
# This will keep good fractional time.
next_timestamp += n_data / self.sample_rate
# self.log.info('Sending %i data on %i channels.' % (len(t), len(T)))
tags = {}
for channel in self.channel_names:
_channel_tag = {channel: {'channel': int(channel.split('_')[1]),
'_field': 'temperature'}}
tags.update(_channel_tag)
publish_block = block.encoded()
publish_block.update(influxdb_tags=tags)
session.app.publish_to_feed('false_temperatures', publish_block)
# Update session.data
data_cache = {"fields": {}, "timestamp": None}
for channel, samples in block.data.items():
data_cache['fields'][channel] = samples[-1]
data_cache['timestamp'] = block.timestamps[-1]
session.data.update(data_cache)
if params['test_mode']:
break
self.agent.feeds['false_temperatures'].flush_buffer()
self.set_job_done()
return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params):
ok = False
with self.lock:
if self.job == 'acq':
session.set_status('stopping')
self.job = '!acq'
ok = True
return (ok, {True: 'Requested process stop.',
False: 'Failed to request process stop.'}[ok])
[docs]
@inlineCallbacks
def count_seconds(self, session, params):
# This process runs entirely in the reactor, as does its stop function.
session.data = {'counter': 0,
'last_update': time.time()}
while session.status == 'running':
yield dsleep(1)
session.data['last_update'] = time.time()
session.data['counter'] += 1
return True, 'Exited on request.'
@inlineCallbacks
def _stop_count_seconds(self, session, params):
yield # Make this a generator.
session.set_status('stopping')
# Tasks
[docs]
@ocs_agent.param('heartbeat', default=True, type=bool)
def set_heartbeat(self, session, params):
"""set_heartbeat(heartbeat=True)
**Task** - Set the state of the agent heartbeat.
Args:
heartbeat (bool, optional): True for on (the default), False for off
"""
heartbeat_state = params['heartbeat']
self.agent._heartbeat_on = heartbeat_state
self.log.info("Setting heartbeat_on: {}...".format(heartbeat_state))
return True, "Set heartbeat_on: {}".format(heartbeat_state)
[docs]
@ocs_agent.param('delay', default=5., type=float, check=lambda x: x >= 0)
@ocs_agent.param('succeed', default=True, type=bool)
@inlineCallbacks
def delay_task(self, session, params):
"""delay_task(delay=5, succeed=True)
**Task** (abortable) - Sleep (delay) for the requested number of
seconds.
This can run simultaneously with the acq Process. This Task
should run in the reactor thread.
Args:
delay (float, optional): Time to wait before returning, in seconds.
Defaults to 5.
succeed (bool, optional): Whether to return success or not.
Defaults to True.
Notes:
The session data will be updated with the requested delay as
well as the time elapsed so far, for example::
>>> response.session['data']
{'requested_delay': 5.,
'delay_so_far': 1.2}
"""
delay = params['delay']
succeed = params['succeed'] is True
if session.cred_level == 2 and delay == 3.33:
# For testing.
return False, 'Actually you need access level 3+ to delay for 3.33 seconds.'
session.data = {'requested_delay': delay,
'delay_so_far': 0}
t0 = time.time()
while session.status == 'running':
session.data['delay_so_far'] = time.time() - t0
sleep_time = min(0.5, delay - session.data['delay_so_far'])
if sleep_time < 0:
break
yield dsleep(sleep_time)
if session.status != 'running':
return False, 'Aborted after %.1f seconds' % session.data['delay_so_far']
return succeed, 'Exited after %.1f seconds' % session.data['delay_so_far']
@inlineCallbacks
def _abort_delay_task(self, session, params):
if session.status == 'running':
session.set_status('stopping')
yield
[docs]
def add_agent_args(parser_in=None):
if parser_in is None:
from argparse import ArgumentParser as A
parser_in = A()
pgroup = parser_in.add_argument_group('Agent Options')
pgroup.add_argument("--mode", default="idle", choices=['idle', 'acq'])
pgroup.add_argument('--num-channels', default=2, type=int,
help='Number of fake readout channels to produce. '
'Channels are co-sampled.')
pgroup.add_argument('--sample-rate', default=9.5, type=float,
help='Frequency at which to produce data.')
pgroup.add_argument('--frame-length', default=60, type=int,
help='Frame length to pass to the aggregator parameters.')
return parser_in
[docs]
def main(args=None):
# Start logging
txaio.start_logging(level=environ.get("LOGLEVEL", "info"))
parser = add_agent_args()
args = site_config.parse_args(agent_class='FakeDataAgent',
parser=parser,
args=args)
startup = False
if args.mode == 'acq':
startup = True
agent, runner = ocs_agent.init_site_agent(args)
# If user specifies an Access Policy, we will make "delay_task"
# require "Advanced" access to help with testing.
if args.access_policy in [None, 'none', '']:
min_privs = 1
else:
min_privs = 2
fdata = FakeDataAgent(agent,
num_channels=args.num_channels,
sample_rate=args.sample_rate,
frame_length=args.frame_length)
agent.register_process('acq', fdata.acq, fdata._stop_acq,
blocking=True, startup=startup)
agent.register_process('count', fdata.count_seconds, fdata._stop_count_seconds,
blocking=False, startup=startup)
agent.register_task('set_heartbeat', fdata.set_heartbeat)
agent.register_task('delay_task', fdata.delay_task, blocking=False,
aborter=fdata._abort_delay_task,
min_privs=min_privs)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()