session.data
Data Feeds make use of the crossbar Pub/Sub functionality to pass data around
the network, however, sometimes you might not want to receive all data, just
the most recent values. For this purpose there is the session.data
attribute.
This is a per OCS operation location to store recent data of interest to OCS
Agent users. The session
argument passed to each Operation function is an
object of class ocs.ocs_agent.OpSession
.
Often this is used to store the most recent values that are queried by the Agent, for example the temperature of thermometers on a Lakeshore device. This information can then be retrieved by a running OCS client and used to inform operation of the Client, for instance waiting until a certain temperature set point in your cryostat before starting detector data acquisition.
Agent Access
To understand how to add data to the session.data
object let’s look at the
Fake Data Agent as an example, specifically at its primary Process,
start_acq
:
1 def start_acq(self, session, params=None):
2 """**Process:** Acquire data and write to the feed.
3
4 This Process has no useful parameters.
5
6 The most recent fake values are stored in the session.data object in
7 the format::
8
9 {"fields":
10 {"channel_00": 0.10250430068515494,
11 "channel_01": 0.08550903376216404,
12 "channel_02": 0.10481891991693446,
13 "channel_03": 0.1060597011760155,
14 "channel_04": 0.1019265554541543,
15 "channel_05": 0.09389479275963578,
16 "channel_06": 0.10071855402986646,
17 "channel_07": 0.09601271802732826,
18 "channel_08": 0.09760831143883832,
19 "channel_09": 0.11345360178932645,
20 "channel_10": 0.10047676575328081,
21 "channel_11": 0.09534462609141414,
22 "channel_12": 0.09654199950059912,
23 "channel_13": 0.11051763608358373,
24 "channel_14": 0.1062686192067794,
25 "channel_15": 0.10793263271024509},
26 "last_updated":1600448753.9288929}
27
28 """
29 ok, msg = self.try_set_job('acq')
30 if not ok: return ok, msg
31
32 if params is None:
33 params = {}
34
35 T = [.100 for c in self.channel_names]
36 block = ocs_feed.Block('temps', self.channel_names)
37
38 next_timestamp = time.time()
39 reporting_interval = 1.
40 next_report = next_timestamp + reporting_interval
41
42 self.log.info("Starting acquisition")
43
44 while True:
45 with self.lock:
46 if self.job == '!acq':
47 break
48 elif self.job == 'acq':
49 pass
50 else:
51 return 10
52
53 now = time.time()
54 delay_time = next_report - now
55 if delay_time > 0:
56 time.sleep(min(delay_time, 1.))
57 continue
58
59 # Safety: if we ever get waaaay behind, reset.
60 if delay_time / reporting_interval < -3:
61 self.log.info('Got way behind in reporting: %.1s seconds. '
62 'Dropping fake data.' % delay_time)
63 next_timestamp = now
64 next_report = next_timestamp + reporting_interval
65 continue
66
67 # Pretend we got it exactly.
68 n_data = int((next_report - next_timestamp) * self.sample_rate)
69
70 # Set the next report time, before checking n_data.
71 next_report += reporting_interval
72
73 # This is to handle the (acceptable) case of sample_rate < 0.
74 if (n_data <= 0):
75 time.sleep(.1)
76 continue
77
78 # New data bundle.
79 t = next_timestamp + np.arange(n_data) / self.sample_rate
80 block.timestamps = list(t)
81
82 # Unnecessary realism: 1/f.
83 T = [_t + np.random.uniform(-1, 1) * .003 for _t in T]
84 for _t, _c in zip(T, self.channel_names):
85 block.data[_c] = list(_t + np.random.uniform(
86 -1, 1, size=len(t)) * .002)
87
88 # This will keep good fractional time.
89 next_timestamp += n_data / self.sample_rate
90
91 # self.log.info('Sending %i data on %i channels.' % (len(t), len(T)))
92 session.app.publish_to_feed('false_temperatures', block.encoded())
93
94 # Update session.data
95 data_cache = {"fields": {}, "last_updated": None}
96 for channel, samples in block.data.items():
97 data_cache['fields'][channel] = samples[-1]
98 data_cache['last_updated'] = block.timestamps[-1]
99 session.data.update(data_cache)
100
101 self.agent.feeds['false_temperatures'].flush_buffer()
102 self.set_job_done()
103 return True, 'Acquisition exited cleanly.'
There’s a lot going on here, which mostly has to do with generating the random data that the Agent produces for testing. The part relevant for our discussion is lines 95-100:
# Update session.data
data_cache = {"fields": {}, "last_updated": None}
for channel, samples in block.data.items():
data_cache['fields'][channel] = samples[-1]
data_cache['last_updated'] = block.timestamps[-1]
session.data.update(data_cache)
This block formats the latest values for each “channel” into a dictionary and
stores it in session.data
.
Structure and Content
The structure of the data
entry is not strictly defined, but
please observe the following guidelines:
Document your
data
structure in the Operation docstring.Provide a timestamp with the readings, or with each group of readings, so that the consumer can confirm they’re recent.
The session data is passed to clients with every API response, so avoid storing a lot of data in there (as a rule of thumb, try to keep it < 100 kB).
Fight the urge to store timestreams (i.e. a history of recent readings) – try to use data feeds for that.
When data are so useful that they are used by other clients / control scripts to make decisions in automated contexts, then they should also be pushed out to a data feed, so that there is a full record of all variables that were affecting system behavior.
Note
You should consider the desired structure carefully, as future changes the
data structure may cause existing clients that make use of the session.data
object to break. Changes that do take place should be announced in the
change logs of new OCS versions.
There are some restrictions on what data can be carried in
session.data
:
The session.data will ultimately be converted and transported using JSON, so some containers will be automatically converted into JSON-compatible forms. Specifically note that:
dict keys will be converted to strings.
there is no distinction between lists and tuples.
there is no (standardized) support for non-finite floats such as inf, -inf, or NaN.
If your session.data contains any NaN, they will be converted to None (which is transported as JSON null).
If your session.data contains inf/-inf, or other JSON-encodable entities, OCS will raise an error. To have those transported as None/null, you should convert inf/-inf to NaN before storing the data in session.data.
If your session.data includes numpy arrays (or scalars), these will be converted to serializable types automatically using
numpy.tolist
.
Client Access
Once your Agent is storing information in the session.data
object you
likely want to access it via an OCS client. The session
object is returned
by all Operation Methods, for instance the status
method, as shown in this small example:
from ocs.ocs_client import OCSClient
therm_client = OCSClient('fake-data1')
therm_client.acq.start()
response = therm_client.acq.status()
After running the client we can examine the data dict stored within the response:
>>> print(response.session.get('data'))
{'fields': {'channel_00': 0.11220355191080153, 'channel_01':
0.0850365880364649, 'channel_02': 0.16598799420080332, 'channel_03':
0.26583693634591293, 'channel_04': 0.24601374140729332, 'channel_05':
0.17319844739787155, 'channel_06': 0.1289138204655707, 'channel_07':
0.21682049008200877, 'channel_08': 0.15539914447393058, 'channel_09':
0.18161931031171688, 'channel_10': 0.040315857256297216, 'channel_11':
0.06916760928468035, 'channel_12': 0.11291917165165984, 'channel_13':
0.0996764253503196, 'channel_14': 0.019171783828962213, 'channel_15':
0.06879881165286862}, 'last_updated': 1600717477.2989068}