from ocs import ocs_agent, site_config, access
import argparse
import os
import time
import yaml
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep as dsleep
[docs]
class AccessDirector:
"""Agent for distributing Access Control information to all Agents
in a system.
"""
def __init__(self, agent, config_file):
self.agent = agent
self.agent.register_feed('controls', record=False)
self.log = agent.log
self._registered = False
# The _step variable increments with every change in
# state. API calls (for getting grants or refreshing config)
# will increment this variable and make the new value
# available to callers. One place this is used is in
# integration tests.
self._step = 1
self._config_file = config_file
self._requests = []
self._active_grants = []
self._load_config()
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def manager(self, session, params):
"""manager()
**Process** - Update the main access control feed with new
access information. This occurs in response to agent queries,
or if new grants require updates to access.
The ``session.data`` encodes information about any active
exclusive grants. The entry for ``grants`` contains a list of
:class:<AccessGrant> entries. It looks like this::
'grants': [
{
"name": "fake-subsystem",
"expire_at": 1771356419.8087718,
"grantee": "test-grant.py",
"rules": [
{
"hashed_pass": {
"hash": "md5",
"value": "03ba0072e0d32376"
},
"cred_level": 2,
"scope_spec": {
"default": false,
"agent_class": "FakeDataAgent",
"instance_id": null
},
"lockout_id": "fake-subsystem",
"lockout_owner": "test-grant.py",
"lockout_levels": [
2,
3
]
},
{
"hashed_pass": {
"hash": "md5",
"value": "03ba0072e0d32376"
},
"cred_level": 1,
"scope_spec": {
"default": true,
"agent_class": null,
"instance_id": null
},
"lockout_id": "fake-subsystem",
"lockout_owner": "test-grant.py",
"lockout_levels": [
3
]
}
]
}
]
"""
session.set_status('running')
session.data = {}
if not self._registered:
yield self.agent.register(
self.agent_poll,
f'{self.agent.agent_address}.agent_poll')
yield self.agent.register(
self.request_exclusive,
f'{self.agent.agent_address}.request_exclusive')
self._registered = True
last_blast = 0
while session.status in ['running']:
yield dsleep(1)
now = time.time()
# Expire grants?
keepers = []
for _g in self._active_grants:
if _g.expire_at <= now:
self.log.info(f'Grant expiring: {_g.name}')
else:
keepers.append(_g)
if len(keepers) < len(self._active_grants):
self._active_grants = keepers
self._update_all()
session.data['grants'] = [v.encode() for v in self._active_grants]
while len(self._requests):
r = self._requests.pop(0)
msg = {'reset': True,
'ac_version': access.AC_VERSION,
'step': self._step,
'rules': list(self._rules)}
for _grant in self._active_grants:
msg['rules'].extend(_grant.rules)
if r['type'] == 'reset':
last_blast = time.time()
elif r['type'] == 'single':
agent = access.AgentSpec(
agent_class=r['agent_class'],
instance_id=r['instance_id'])
subrules = access.agent_filter_rules(msg['rules'], agent)
msg = {'target': r['instance_id'],
'ac_version': access.AC_VERSION,
'step': self._step,
'rules': subrules}
if 'rules' in msg:
msg['rules'] = [access.asdict(r) for r in msg['rules']]
self.agent.publish_to_feed('controls', msg)
if time.time() - last_blast > 60:
self._update_all(0)
return True, 'Exited.'
def _load_config(self):
# Read the file and convert to internal rep.
config_raw = yaml.safe_load(open(self._config_file, 'rb'))
config = access.director_parse_config(config_raw)
# Transfer new config to self.
for k, v in config.items():
setattr(self, k, v)
# Request a complete update.
self._update_all()
def _update_all(self, step=1):
self._step += step
self._requests.append({'type': 'reset'})
[docs]
@ocs_agent.param('_')
@inlineCallbacks
def reload_config(self, session, params):
"""reload_config()
**Task** - Reload access config file.
"""
yield self._load_config()
return True, 'Update requested.'
[docs]
def agent_poll(self, instance_id, agent_class):
"""*Special access point.* This is used for agents to request
an announcement of their password rules on the control feed.
The instance_id and agent_class arguments must both be
specified (and not None). Returns True if arguments are
sufficiently valid and request was slated; otherwise False.
"""
self.log.info(f'agent-poll received from {agent_class}:{instance_id}')
r = {
'type': 'single',
'instance_id': instance_id,
'agent_class': agent_class,
}
# Validate that before putting it into the request queue.
try:
access.AgentSpec(
agent_class=r['agent_class'],
instance_id=r['instance_id'])
except Exception as e:
self.log.error('Failed validation: {e}', e=e)
return False
self._requests.append(r)
return True
[docs]
def request_exclusive(self, grant_name=None, password=None,
action=None, expire_at=None, grantee=None,
strict=None):
"""*Special access point.* Request, renew, or release an
exclusive access grant.
Args:
grant_name (str): Name of the grant, to match an entry in
the "grant-blocks" section of the config file.
password (str): The password, to be checked against the
password specified in the grant block of the config.
action (str): One of "acquire", "renew" or "release".
expire_at (float): Unix timestamp for the desired expiry
time of the grant.
grantee (str): A string representing the client that has
requested the lock. When passed with "acquire", it is
stored for distribution to clients so they can explain who
has locked them out.
strict (bool): If True, reject all requests except acquire
when a grant is inactive, renew when a grant is active,
and release when a grant is active.
Returns:
A dict with useful info.
On error, the dict has only an entry "error" with an error
message in it.
On success, the returned dict has at least the items
'grant_name' (which matches the requested grant_name) and
'message'; the 'message' is just "grant acquired" / "grant
renewed" / "grant released". It also has the 'step' at
which this event took effect in the director.
Additionally, if the 'action' is 'acquire' or 'renew' then
the dict will include an entry 'expire_at' with the unix
timestamp that the grant will be cancelled automatically.
This timestamp may be earlier (but not later) than the time
requested with the expire_at parameter.
If the action is 'acquire', then the dict also has an entry
'password', containing the password client should use to
access the exclusive access targets for the duration of the
access grant.
"""
if grant_name is None:
return {'error': 'No grant_name specified.'}
if strict is None:
strict = True
for block in self._grant_blocks:
if block.name == grant_name:
break
else:
return {'error': f'Named grant not found ({grant_name})'}
if block.password is not None and block.password != password:
return {'error': f'Credential failed to access ({grant_name})'}
# Does this grant already exist somewhere?
for grant_idx, g in enumerate(self._active_grants):
if g.name == grant_name:
break
else:
grant_idx = None
if action == 'acquire':
if grant_idx is not None:
if strict:
return {'error': ('Grant is already held (grantee: '
+ f'{g.grantee}); release it first.')}
else:
self._active_grants.pop(grant_idx)
# Generate passwords for this grant.
password, rules = access.director_get_access_rules(block, grantee)
new_grant = AccessGrant(grant_name, rules, expire_at, grantee)
self._active_grants.append(new_grant)
self._update_all()
self.log.info(f'Exclusive access granted: {grant_name}')
return {'message': 'grant acquired', 'password': password,
'grant_name': grant_name,
'expire_at': new_grant.expire_at,
'step': self._step}
elif action == 'renew':
if grant_idx is None:
return {'error': 'Grant is not currently held; cannot renew.'}
self._active_grants[grant_idx].renew(expire_at)
return {'message': 'grant renewed',
'grant_name': grant_name,
'expire_at': self._active_grants[grant_idx].expire_at,
'step': self._step}
elif action == 'release':
if grant_idx is None:
if strict:
return {'error': 'Grant is not currently held; cannot release.'}
else:
self._active_grants.pop(grant_idx)
self._update_all()
self.log.info(f'Exclusive access relinquished: {grant_name}')
return {'message': 'grant released',
'grant_name': grant_name,
'step': self._step,
}
@inlineCallbacks
def _simple_stop(self, session, params):
yield
if session.status not in ['stopping', 'done']:
session.set_status('stopping')
return True, 'Stop initiated.'
return False, 'Already done/stopping.'
[docs]
class AccessGrant:
def __init__(self, name, rules, expire_at, grantee):
self.name = name
self.rules = rules
self.renew(expire_at)
self.grantee = grantee
[docs]
def renew(self, expire_at):
if expire_at is None:
expire_at = time.time() + 60
self.expire_at = expire_at
[docs]
def encode(self):
return {
'name': self.name,
'expire_at': self.expire_at,
'grantee': self.grantee,
'rules': [access.asdict(r) for r in self.rules],
}
[docs]
def make_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
pgroup = parser.add_argument_group('Agent Options')
pgroup.add_argument('--config-file', default=None,
help="AccessDirector config file.")
return parser
[docs]
def main(args=None):
parser = make_parser()
args = site_config.parse_args(agent_class='AccessDirector',
parser=parser,
args=args)
config_file = args.config_file
if config_file[0] != '/':
# Relative to SCF location.
config_file = os.path.join(
os.path.dirname(args.site_file), config_file)
# Force the access_policy arg to "none", to prevent OCSAgent from
# stalling, trying to get access passwords from this instance ...
args.access_policy = 'none'
agent, runner = ocs_agent.init_site_agent(args)
access_director = AccessDirector(agent, config_file)
agent.register_process('manager',
access_director.manager,
access_director._simple_stop,
blocking=False,
startup=True)
agent.register_task('reload_config',
access_director.reload_config,
blocking=False)
runner.run(agent, auto_reconnect=True)
if __name__ == '__main__':
main()