"""Access Control is a system for restricting access to some Agents or
Agent functions to certain clients.
Functions in this module are prefixed with one of:
- ``agent_`` -- functions used by an Agent to determine access
controls and verify client access credentials.
- ``client_`` -- functions used by client programs to interact with
Agents that implement access control.
- ``director_`` -- functions used by Access Director to process access
rules and grant requests.
"""
import hashlib
import os
import secrets
import string
import time
import yaml
from enum import IntEnum
from fnmatch import fnmatch
from typing import List, Optional, Tuple, Union
# "asdict" imported so module users can easily serialize dataclasses.
from dataclasses import field, fields, asdict # noqa
# Use pydantic dataclass in order to deserialize complex hierarchical
# objects, such as a dataclass that has a member that is a list of
# some other dataclass.
from pydantic.dataclasses import dataclass
from . import ocs_client
#: The protocol version ("access_control") that agents report
#: themselves as using via get_api.
AC_VERSION = 1
[docs]
class CredLevel(IntEnum):
"""Representation of the credential level of a client, or the
required credential level for some operation."""
BLOCKED = 0
BASIC = 1
ADVANCED = 2
FULL = 3
SUPERUSER = 4
def __str__(self):
return f'{self.value}-{self.name}'
#: Keys to use for passwords associated with each CredLevel. These are
#: used in config blocks for Access Director and OCS clients, for
#: example.
CRED_KEYS = [
('password_1', CredLevel(1)),
('password_2', CredLevel(2)),
('password_3', CredLevel(3)),
('password_4', CredLevel(4)),
]
# Password and hashing support
def _get_random_string(length=8):
return ''.join([secrets.choice(string.ascii_letters) for i in range(length)])
def _hashfunc_no_hash(x):
return x
def _hashfunc_short_md5(x):
return hashlib.md5(x.encode('utf-8')).hexdigest()[:16]
HASHFUNCS = {
'none': _hashfunc_no_hash,
'blocked': lambda x: ValueError('Should not call this hash func.'),
'md5': _hashfunc_short_md5,
}
[docs]
@dataclass
class HashedPass:
"""Container for hashed password values, for internal storage in
Access Director agent as well as for distribution to Agents that
need to validate clients.
"""
#: The identifier for the hash being used (use "none") for clear
#: text, and "blocked" to refuse to match any user password.
hash: str
#: The hashed string. If this string is empty, and hash !=
#: "blocked", then *any* provided password will match.
value: str = ''
[docs]
def check(self, value: str) -> bool:
"""Check if the provided clear-text password matches this
object's stored password, after hashing.
"""
if value is None:
value = ''
if self.hash == 'blocked':
return False
if self.value == '':
return True
hashfunc = HASHFUNCS[self.hash]
return hashfunc(value) == self.value
[docs]
@classmethod
def create_blocked(cls):
"""Return an object representing "blocked" access."""
return cls(hash='blocked')
[docs]
@classmethod
def create_free(cls):
"""Return an object representing passwordless access."""
return cls(hash='none')
[docs]
@classmethod
def create_from_value(cls, hash, clear_text):
"""Return on object constructed from the hashed value of
clear_text password.
"""
hashfunc = HASHFUNCS[hash]
return cls(hash=hash, value=hashfunc(clear_text))
[docs]
@dataclass
class AccessPasswordItem:
"""A single entry in a password configuration file.
This is used both in the Access Configuration File, which
configures the Access Director, and in the OCS Password File,
which clients reference to find passwords to use for specific
agents.
Each field in this object is either a **selector** or a
**password**.
The **selectors** are used to determine whether this rule applies
to a particular agent instance (described in terms of its *agent_class* and
*instance_id*). Those keys are:
- ``default`` (bool): True if this selector should apply to all
agent instances -- when this is set, all other selector keys
are ignored.
- ``agent_class`` (str): This is a pattern to match against the
provided *agent_class*. If absent or set to None, this is
ignored.
- ``instance_id`` (str): This is a pattern to match against the
provided *instance_id*. If absent or set to None, this is
ignored.
Patterns and matching are as described in this module's
:func:`pattern_match`.
The **passwords** consist of:
- ``password_1``: The password associated with credential level 1.
- ``password_2``: The password ... level 2.
- ``password_3``: The password ... level 3.
- ``password_4``: The password ... level 4.
When used in the Access Configuration File, the *password* entries
define passwords that should be accepted, to grant clients that
level of access. The passwords may be strings (which will be
interpreted as either clear-text, or pre-hashed values, depending
on the ``passwords_block_default_hashfunc`` setting). When the
Agent is checking a password, it considers all the rules and
returns the highest credential level matched by the password and
target.
When used in the OCS Password File, only ``password_2`` and/or
``password_3`` keys should be used -- and the values should be
strings, which will be interpreted as the clear text passwords to
use in the Client.
"""
#: Selector variables
default: Optional[bool] = False
agent_class: Optional[str] = None
instance_id: Optional[str] = None
#: Password variables
password_1: Optional[Union[str, HashedPass]] = None
password_2: Optional[Union[str, HashedPass]] = None
password_3: Optional[Union[str, HashedPass]] = None
password_4: Optional[Union[str, HashedPass]] = None
[docs]
def get_scope_spec(self):
return ScopeSpec(default=self.default,
agent_class=self.agent_class,
instance_id=self.instance_id)
[docs]
@dataclass
class AgentSpec:
"""Identifying information for a specific Agent Instance. Info
here is compared against a ScopeSpec for an AccessRule, to see if
rule should be applied to the Instance.
"""
instance_id: str
agent_class: str
#: The superuser_key is used for an Agent to start its own
#: operations, internally, such as on startup.
superuser_key: Optional[object] = None
[docs]
@dataclass
class ActionContext:
"""Placeholder class for future fine-grain access control /
lockout of individual operations at the Access Director level.
"""
op_name: Optional[str] = None
action: Optional[str] = None
[docs]
def pattern_match(target: str, pattern: str, raw=False):
"""Pattern-matching of a target against a pattern is defined as
follows:
- A pattern consists of one or more sub-patterns, separated by
commas. The pattern matches the target if any of the
positive sub-patterns match the target, as long as none of
the negative sub-patterns matches the target.
- If a sub-pattern does not start with "!", it is considered a
positive sub-pattern and fnmatch is used to test the
sub-pattern against the target.
- If a sub-pattern starts with "!", then it is a negative
sub-pattern and the remainder of the sub-pattern text is
used with fnmatch to test against the target.
Examples:
- The pattern ``"Director,Act*,*Producer*"`` matches the string
"Director" or any string that starts with "Act" or contains
"Producer".
- The pattern ``"*Agent,!FakeDataAgent"`` matches any string
that ends with "Agent", except the string "FakeDataAgent".
- The pattern ``"compute*,login*,!*9,!*8"`` matches any string
that starts with "compute" or "login" and does not end with
"8" or "9".
"""
assert (',' not in target) and ('!' not in target)
pos, neg = [], []
for subpat in pattern.split(','):
dest = pos
if subpat.startswith('!'):
dest = neg
subpat = subpat[1:]
if len(subpat) == 0:
continue
dest.append(fnmatch(target, subpat))
if raw:
return pos, neg
return (len(pos) == 0 or any(pos)) and not any(neg)
[docs]
@dataclass
class ScopeSpec:
"""A specification of the scope of an AccessRule. See ``check``
function for matching details.
"""
default: Optional[bool] = False
agent_class: Optional[str] = None
instance_id: Optional[str] = None
[docs]
def check(self, agent: AgentSpec):
"""Determine whether ``agent`` matches the present ScopeSpec.
If ``self.default`` is True, then this function returns True.
Otherwise, if ``self.agent_class`` and ``self.instance_id``
are both None, then this function returns False. Otherwise
the agent must match the pattern in self.agent_class, and the
pattern in self.instance_id.
See :func:`pattern_match` in this module for a description of
patterns and matching.
"""
if self.default:
return True
if self.agent_class is None and self.instance_id is None:
return False
if (self.agent_class is not None
and not pattern_match(agent.agent_class, self.agent_class)):
return False
if (self.instance_id is not None
and not pattern_match(agent.instance_id, self.instance_id)):
return False
return True
[docs]
def get_specificity(self):
"""Return the *specificity* of this rule, for sorting. The
principles are that positive patterns are more specific than
negative patterns; and then that instance_id selection is more
specific than agent_class selection.
If default=True, then 0 is returned. If not default, but
agent_class and instance_id are both None, then 0 is returned.
Otherwise, the specificity is the sum of the following
contributions:
- 8 if instance_id includes any positive subpatterns.
- 4 if agent_class includes any positive subpatterns.
- 2 if instance_id includes any negative subpatterns.
- 1 if agent_class includes any negative subpatterns.
"""
if self.default:
return 0
spec_bits = []
if self.instance_id is not None:
pos, neg = pattern_match('', self.instance_id, raw=True)
if len(pos):
spec_bits.append(3)
if len(neg):
spec_bits.append(1)
if self.agent_class is not None:
pos, neg = pattern_match('', self.agent_class, raw=True)
if len(pos):
spec_bits.append(2)
if len(neg):
spec_bits.append(0)
if len(spec_bits) == 0:
return -1
return sum([(1 << b) for b in spec_bits])
[docs]
@dataclass
class AccessRule:
"""A Rule for consumption by an Agent to grant or revoke access.
The ``scope_spec`` determines what Agent Instances the rule should
be applied to. The cred_level is the level granted by this rule,
if password in hashed_pass has been provided.
The lockout_* entries are populated in the case that this rule
arises from an Exclusive Access Grant. The lockout_id is the name
given to some specific lockout definition. The lockout_owner is
some identifier provided by whoever requested the lockout. And
lockout_levels is the list of CredLevels that other callers (those
without the present lockout password) are forbidden from having.
"""
hashed_pass: Optional[HashedPass]
cred_level: Optional[CredLevel]
scope_spec: ScopeSpec = field(default_factory=lambda: ScopeSpec(default=True))
lockout_id: Optional[str] = None
lockout_owner: Optional[str] = None
lockout_levels: Optional[List[CredLevel]] = field(default_factory=list)
[docs]
@dataclass
class AgentAccessRules:
"""A container used by Agents to hold configuration, including
AccessRule (whether generated in place or received from Access
Director agent).
"""
policy: Optional[str] = None
director_id: Optional[str] = None
agent: Optional[AgentSpec] = None
rules: List[AccessRule] = field(default_factory=list)
[docs]
def agent_get_policy_default(policy: str) -> [AgentAccessRules]:
"""Get the default access rules, based on a "policy" string
(probably from the --access-policy Agent argument).
The policy passed in by the user should be one of the following:
- "none" (or "", or None). This effectively disables Access
Control, as the returned rules will give a caller maximum
privileges, no matter what password.
- "override:pw2,pw3", where "pw2" and "pw3" represent the desired
level 2 and level 3 passwords (unhashed).
- "director:access-dir", where "access-dir" represents the
instance-id of an Access Director agent. The rules returned in
this case will, initially, block all access; it's expected these
rules will be updated by messages from the Access Director.
"""
default_scope = ScopeSpec(default=True)
free_pass = HashedPass.create_free()
blocked_pass = HashedPass.create_blocked()
if policy in [None, '', 'none']:
return AgentAccessRules(
policy='none',
rules=[AccessRule(scope_spec=default_scope,
hashed_pass=free_pass,
cred_level=CredLevel(level))
for level in [1, 2, 3]])
policy, args = policy.split(':', 1)
if policy == 'director':
if args == '':
args = 'access-director'
return AgentAccessRules(
policy='director',
director_id=args,
rules=[AccessRule(scope_spec=default_scope,
hashed_pass=blocked_pass,
cred_level=CredLevel(level))
for level in [1, 2, 3]])
elif policy == 'override':
pws = args.split(',')
return AgentAccessRules(
policy='override',
rules=[
AccessRule(scope_spec=default_scope,
hashed_pass=free_pass,
cred_level=CredLevel(1)),
AccessRule(scope_spec=default_scope,
hashed_pass=HashedPass(hash='none', value=pws[0]),
cred_level=CredLevel(2)),
AccessRule(scope_spec=default_scope,
hashed_pass=HashedPass(hash='none', value=pws[1]),
cred_level=CredLevel(3)),
])
raise ValueError(f'Invalid policy "{policy}"')
[docs]
def agent_filter_rules(rules: List[AccessRule],
agent: AgentSpec) -> [List[AccessRule]]:
"""Filter a list of AccessRules, keeping only ones pertinent to
agent.
"""
return [rule for rule in rules
if rule.scope_spec.check(agent)]
[docs]
def agent_get_creds(
password: Union[str, None],
access_rules: AgentAccessRules,
agent: AgentSpec,
action: ActionContext = None,
) -> [CredLevel]:
"""Based on the access_rules, and the provided password, determine
the credential level for the specified agent and action.
"""
if access_rules.policy == 'none':
return CredLevel(1), "No access policy -- level 1 for all."
if agent.superuser_key is not None and password is agent.superuser_key:
return CredLevel(4), "Super-user."
cred_levels = [CredLevel(0)]
blocked_levels = []
lockout_owners = set()
for rule in access_rules.rules:
if not rule.scope_spec.check(agent):
continue
if rule.hashed_pass.check(password):
cred_levels.append(rule.cred_level)
elif len(rule.lockout_levels):
blocked_levels.extend(rule.lockout_levels)
lockout_owners.add(f"{rule.lockout_id}[{rule.lockout_owner}]")
privs_diminished = False
cred_level = max(cred_levels)
while cred_level > CredLevel(0) and cred_level in blocked_levels:
cred_level = CredLevel(cred_level.value - 1)
privs_diminished = True
return cred_level, ("" if not privs_diminished else
f"Privileges diminished; lockouts active: {list(lockout_owners)}")
[docs]
def agent_rejection_message(cred_level: CredLevel, required_level: CredLevel,
lockout_detail: str = ''):
"""Get a helpful message about what privs are needed to access a
resource protected at required_level.
"""
assert cred_level.value < required_level.value
text = (f'The action requires credential level {required_level} but the '
f'client has only level {cred_level}')
if lockout_detail != '':
text += ' ' + lockout_detail
return text
#
# Client support
#
[docs]
def client_get_password(privs, agent_class, instance_id):
"""For OCSClient use -- determine the best client password to use.
This may lead to inspection of OCS password files.
Args:
privs (str, int): If this is a string, it will be used as the
password. If this argument is an integer, it represents a
desired credential level and the local password configuration
will be inspected to find a password associated with access at
that level.
agent_class (str or None): If specified, will be used to match
rules in the password config file.
instance_id (str or None): If specified, will be used to match
rules in the password config file.
Returns:
A string representing the password to use in all requests the
client makes (these are passed to the agent in the
"password=..." argument of the _ops_handler).
Notes:
If privs is a string, then this password is used directly and no
inspection of config files is performend.
If privs is 1, then the password '' is returned.
If privs is 2 or 3, then the OCS password file is loaded and
inspected for a suitable password. The password file will be
loaded from ``~/.ocs-passwords.yaml``, unless overridden by the
environment variable ``OCS_PASSWORDS_FILE``.
The password file is in yaml format, containing a single list.
Each entry in the list is a dictionary referred to as a rule.
Each rule is a dict with the schema described in
:class:`ocs.access.AccessPasswordItem`.
To find a suitable password, all rules are considered. When
multiple rules have selectors that match the target and contain
a password of the required credential level or higher, then the
one that is most *specific* is taken. When multiple rules are
tied for specificity, the one occurring latest in the list is
taken. "Specificity" is outlined in
:func:`ScopeSpec.get_specificity`.
Here's an example passwords file::
- default: true
password_3: "general-access-password"
- agent_class: "FakeDataAgent"
instance_id: "!faker4,!faker1"
password_2: "normal-faker-password"
- instance_id: "faker*"
password_2: "special-faker-password"
- instance_id: "faker4"
password_2: "special-faker4-password"
Suppose faker1 and faker4 both have agent_class "FakeDataAgent".
For level 2 access, "faker1" matches the rules at index 0 and 2,
but the most specific rule is 2 so that is used. For "faker4",
rules 0, 2, and 3 match. Rules 2 and 3 are equally specific but
rule 3 occurs later so it is used.
"""
if isinstance(privs, str):
return privs
if privs is None:
privs = 1
if not isinstance(privs, int):
raise ValueError("privs argument should be int or str.")
assert 1 <= privs <= 3
if privs == 1:
return ''
if os.getenv('OCS_PASSWORDS_FILE'):
pw_file = os.getenv('OCS_PASSWORDS_FILE')
else:
pw_file = os.path.expanduser('~/.ocs-passwords.yaml')
agent = AgentSpec(agent_class=agent_class, instance_id=instance_id)
creds = yaml.safe_load(open(pw_file, 'rb'))
candidates = []
for i, row in enumerate(creds):
scope_kw = {k: row.get(k) for k in ['default', 'agent_class', 'instance_id']}
scope = ScopeSpec(**scope_kw)
if scope.check(agent):
specificity = scope.get_specificity()
if 'password_2' in row and privs <= 2:
candidates.append((specificity, i, row['password_2']))
elif 'password_3' in row and privs <= 3:
candidates.append((specificity, i, row['password_3']))
if len(candidates) == 0:
return ''
return max(candidates)[2]
[docs]
class ExclusiveAccessClient:
"""Manager class for Exclusive Access grants.
Args:
target:
instance_id of the Access Director agent (or an
OCSClient to use).
grantee:
Identifier for the requester (for consumer information).
grant_name:
The grant name, as defined in the grant config block.
password:
The password for the grant config block, if needed.
The `acquire`, `renew` and `release` methods all return a tuple
with `(ok, detail)`, where `ok` is a boolean indicating that
things seem to have worked, and `detail` is the full "useful info"
result returned by the call to :func:`AccessDirector.request_exclusive
<ocs.agents.access_director.agent.AccessDirector.request_exclusive>`.
See that function for details.
"""
def __init__(self, target: str,
grantee: str, grant_name: str, password: str = None):
if isinstance(target, str):
self._client = ocs_client.OCSClient(target)
else:
self._client = target
self._gargs = {
'grantee': grantee,
'grant_name': grant_name,
'password': password,
}
self.password = None
self.expire_at = None
self.last_response = {'error': 'no history'}
[docs]
def acquire(self, expire_at: float = None) -> Tuple[bool, dict]:
"""Try to acquire the exclusive access grant. ``expire_at``
is an optional unix timestamp to suggest as the grant expiry
time.
If this succeeds, the access password, and timestamp at which
the grant will expire, are stored in ``self.password`` and
``self.expire_at``.
"""
if expire_at is None:
expire_at = time.time() + 100.
resp = self._client._client.special('request_exclusive',
action='acquire',
expire_at=expire_at,
**self._gargs)
self.last_response = resp
if 'error' in resp:
return False, resp
self.password = resp['password']
self.expire_at = resp['expire_at']
return True, resp
[docs]
def renew(self, expire_at: float = None) -> Tuple[bool, dict]:
"""Renew the Exclusive Access grant."""
if expire_at is None:
expire_at = time.time() + 100.
resp = self._client._client.special('request_exclusive',
action='renew',
expire_at=expire_at,
**self._gargs)
if 'error' in resp:
return False, resp
self.expire_at = resp['expire_at']
return True, resp
[docs]
def release(self) -> Tuple[bool, dict]:
"""Release the Exclusive Access grant."""
resp = self._client._client.special('request_exclusive',
action='release',
**self._gargs)
self.password = None
self.expire_at = None
return 'error' not in resp, resp
#
# Access Director Agent support
#
[docs]
@dataclass
class GrantConfigItem:
"""A single entry from Access Director config file.
"""
cred_level: Optional[CredLevel]
# For ScopeSpec...
default: Optional[bool] = False
agent_class: Optional[str] = None
instance_id: Optional[str] = None
# Levels to which this grant prevents regular access.
lockout_levels: Optional[List[CredLevel]] = field(default_factory=list)
[docs]
def get_scope_spec(self):
return ScopeSpec(default=self.default,
agent_class=self.agent_class,
instance_id=self.instance_id)
[docs]
@dataclass
class DistributedAccessGrant:
"""Class for decoding a "grant" block of Access Director config
file.
"""
name: str
grants: List[GrantConfigItem]
password: Optional[str] = None
hash: Optional[str] = 'md5'
[docs]
@dataclass
class AccessDirectorConfig:
passwords_block_default_hashfunc: str = 'none'
distrib_hashfunc: str = 'md5'
passwords: List[AccessPasswordItem] = field(default_factory=list)
exclusive_access_blocks: List[DistributedAccessGrant] = field(default_factory=list)
[docs]
def director_parse_config(config: dict) -> dict:
"""Parse a config file and return the config dict. This includes
some validation and some translation.
The elements at the top level of the returned dict will become
attributes of the AD instance, so be careful what you add in
there.
"""
# Do a bunch of validation ...
adc = AccessDirectorConfig(**config)
out = {
'_hashname': adc.distrib_hashfunc,
'_hashfunc': HASHFUNCS[adc.distrib_hashfunc],
}
def promote_pw(pw):
"""Encode a password with the chosen default hash; unless it
is the empty password in which case just leave it unhashed.
"""
if isinstance(pw, str):
if pw == '':
pw = HashedPass.create_free()
else:
pw = HashedPass(hash=adc.passwords_block_default_hashfunc,
value=pw)
if pw.hash != 'none':
return pw
if pw.hash == 'none' and pw.value == '':
return pw
return HashedPass.create_from_value(out['_hashname'], pw.value)
rules = []
for entry in adc.passwords:
scope = entry.get_scope_spec()
for k, level in CRED_KEYS:
if getattr(entry, k) is None:
continue
hashed = promote_pw(getattr(entry, k))
rules.append(AccessRule(
hashed_pass=hashed, cred_level=level, scope_spec=scope))
out.update({
'_rules': rules,
'_grant_blocks': adc.exclusive_access_blocks,
})
return out
[docs]
def director_get_access_rules(
grant_def: DistributedAccessGrant,
grant_owner: str) -> Tuple[str, List[AccessRule]]:
"""Generate password for a grant block, and the corresponding
access rule blocks to be passed to agents for processing
credentials.
Returns:
password
The password granting access.
rules
The list of AccessRule objects for distribution to agents.
"""
pw = _get_random_string()
hashed_pass = HashedPass.create_from_value(grant_def.hash, pw)
rules = []
for grant in grant_def.grants:
scope = ScopeSpec(default=grant.default,
agent_class=grant.agent_class,
instance_id=grant.instance_id)
rule = AccessRule(scope_spec=scope,
cred_level=grant.cred_level,
hashed_pass=hashed_pass,
lockout_id=grant_def.name,
lockout_owner=grant_owner,
lockout_levels=grant.lockout_levels)
rules.append(rule)
return pw, rules