diff options
| -rw-r--r-- | src/pyramid/authorization.py | 79 | ||||
| -rw-r--r-- | src/pyramid/security.py | 145 | ||||
| -rw-r--r-- | tests/test_security.py | 275 |
3 files changed, 415 insertions, 84 deletions
diff --git a/src/pyramid/authorization.py b/src/pyramid/authorization.py index 6056a8d25..19b96e3d1 100644 --- a/src/pyramid/authorization.py +++ b/src/pyramid/authorization.py @@ -2,11 +2,7 @@ from zope.interface import implementer from pyramid.interfaces import IAuthorizationPolicy -from pyramid.location import lineage - -from pyramid.security import ACLAllowed, ACLDenied, Allow, Deny, Everyone - -from pyramid.util import is_nonstr_iter +from pyramid.security import ACLHelper @implementer(IAuthorizationPolicy) @@ -61,80 +57,21 @@ class ACLAuthorizationPolicy(object): :class:`pyramid.interfaces.IAuthorizationPolicy` interface. """ + def __init__(self): + self.helper = ACLHelper() + def permits(self, context, principals, permission): """ Return an instance of :class:`pyramid.security.ACLAllowed` instance if the policy permits access, return an instance of :class:`pyramid.security.ACLDenied` if not.""" - - acl = '<No ACL found on any object in resource lineage>' - - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - if acl and callable(acl): - acl = acl() - - for ace in acl: - ace_action, ace_principal, ace_permissions = ace - if ace_principal in principals: - if not is_nonstr_iter(ace_permissions): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - if ace_action == Allow: - return ACLAllowed( - ace, acl, permission, principals, location - ) - else: - return ACLDenied( - ace, acl, permission, principals, location - ) - - # default deny (if no ACL in lineage at all, or if none of the - # principals were mentioned in any ACE we found) - return ACLDenied( - '<default deny>', acl, permission, principals, context - ) + return self.helper.permits(context, principals, permission) def principals_allowed_by_permission(self, context, permission): """ Return the set of principals explicitly granted the permission named ``permission`` according to the ACL directly attached to the ``context`` as well as inherited ACLs based on the :term:`lineage`.""" - allowed = set() - - for location in reversed(list(lineage(context))): - # NB: we're walking *up* the object graph from the root - try: - acl = location.__acl__ - except AttributeError: - continue - - allowed_here = set() - denied_here = set() - - if acl and callable(acl): - acl = acl() - - for ace_action, ace_principal, ace_permissions in acl: - if not is_nonstr_iter(ace_permissions): - ace_permissions = [ace_permissions] - if (ace_action == Allow) and (permission in ace_permissions): - if ace_principal not in denied_here: - allowed_here.add(ace_principal) - if (ace_action == Deny) and (permission in ace_permissions): - denied_here.add(ace_principal) - if ace_principal == Everyone: - # clear the entire allowed set, as we've hit a - # deny of Everyone ala (Deny, Everyone, ALL) - allowed = set() - break - elif ace_principal in allowed: - allowed.remove(ace_principal) - - allowed.update(allowed_here) - - return allowed + return self.helper.principals_allowed_by_permission( + context, permission + ) diff --git a/src/pyramid/security.py b/src/pyramid/security.py index 9088a9746..bfd505a98 100644 --- a/src/pyramid/security.py +++ b/src/pyramid/security.py @@ -9,6 +9,10 @@ from pyramid.interfaces import ( IViewClassifier, ) +from pyramid.location import lineage + +from pyramid.util import is_nonstr_iter + from pyramid.threadlocal import get_current_registry Everyone = 'system.Everyone' @@ -36,22 +40,12 @@ DENY_ALL = (Deny, Everyone, ALL_PERMISSIONS) NO_PERMISSION_REQUIRED = '__no_permission_required__' -def _get_registry(request): - try: - reg = request.registry - except AttributeError: - reg = get_current_registry() # b/c - return reg - - def _get_security_policy(request): - registry = _get_registry(request) - return registry.queryUtility(ISecurityPolicy) + return request.registry.queryUtility(ISecurityPolicy) def _get_authentication_policy(request): - registry = _get_registry(request) - return registry.queryUtility(IAuthenticationPolicy) + return request.registry.queryUtility(IAuthenticationPolicy) def remember(request, userid, **kw): @@ -154,7 +148,7 @@ def view_execution_permitted(context, request, name=''): An exception is raised if no view is found. """ - reg = _get_registry(request) + reg = request.registry provides = [IViewClassifier] + [providedBy(x) for x in (request, context)] # XXX not sure what to do here about using _find_views or analogue; # for now let's just keep it as-is @@ -421,3 +415,128 @@ class LegacySecurityPolicy: authz = self._get_authz_policy(request) principals = authn.effective_principals(request) return authz.permits(context, principals, permission) + + +class ACLHelper: + """ A helper for use with constructing a :term:`security policy` which + consults an :term:`ACL` object attached to a :term:`context` to determine + authorization information about a :term:`principal` or multiple principals. + If the context is part of a :term:`lineage`, the context's parents are + consulted for ACL information too. + + """ + + def permits(self, context, principals, permission): + """ Return an instance of :class:`pyramid.security.ACLAllowed` if the + ACL allows access a user with the given principals, return an instance + of :class:`pyramid.security.ACLDenied` if not. + + When checking if principals are allowed, the security policy consults + the ``context`` for an ACL first. If no ACL exists on the context, or + one does exist but the ACL does not explicitly allow or deny access for + any of the effective principals, consult the context's parent ACL, and + so on, until the lineage is exhausted or we determine that the policy + permits or denies. + + During this processing, if any :data:`pyramid.security.Deny` + ACE is found matching any principal in ``principals``, stop + processing by returning an + :class:`pyramid.security.ACLDenied` instance (equals + ``False``) immediately. If any + :data:`pyramid.security.Allow` ACE is found matching any + principal, stop processing by returning an + :class:`pyramid.security.ACLAllowed` instance (equals + ``True``) immediately. If we exhaust the context's + :term:`lineage`, and no ACE has explicitly permitted or denied + access, return an instance of + :class:`pyramid.security.ACLDenied` (equals ``False``). + + """ + acl = '<No ACL found on any object in resource lineage>' + + for location in lineage(context): + try: + acl = location.__acl__ + except AttributeError: + continue + + if acl and callable(acl): + acl = acl() + + for ace in acl: + ace_action, ace_principal, ace_permissions = ace + if ace_principal in principals: + if not is_nonstr_iter(ace_permissions): + ace_permissions = [ace_permissions] + if permission in ace_permissions: + if ace_action == Allow: + return ACLAllowed( + ace, acl, permission, principals, location + ) + else: + return ACLDenied( + ace, acl, permission, principals, location + ) + + # default deny (if no ACL in lineage at all, or if none of the + # principals were mentioned in any ACE we found) + return ACLDenied( + '<default deny>', acl, permission, principals, context + ) + + def principals_allowed_by_permission(self, context, permission): + """ Return the set of principals explicitly granted the permission + named ``permission`` according to the ACL directly attached to the + ``context`` as well as inherited ACLs based on the :term:`lineage`. + + When computing principals allowed by a permission, we compute the set + of principals that are explicitly granted the ``permission`` in the + provided ``context``. We do this by walking 'up' the object graph + *from the root* to the context. During this walking process, if we + find an explicit :data:`pyramid.security.Allow` ACE for a principal + that matches the ``permission``, the principal is included in the allow + list. However, if later in the walking process that principal is + mentioned in any :data:`pyramid.security.Deny` ACE for the permission, + the principal is removed from the allow list. If a + :data:`pyramid.security.Deny` to the principal + :data:`pyramid.security.Everyone` is encountered during the walking + process that matches the ``permission``, the allow list is cleared for + all principals encountered in previous ACLs. The walking process ends + after we've processed the any ACL directly attached to ``context``; a + set of principals is returned. + + """ + allowed = set() + + for location in reversed(list(lineage(context))): + # NB: we're walking *up* the object graph from the root + try: + acl = location.__acl__ + except AttributeError: + continue + + allowed_here = set() + denied_here = set() + + if acl and callable(acl): + acl = acl() + + for ace_action, ace_principal, ace_permissions in acl: + if not is_nonstr_iter(ace_permissions): + ace_permissions = [ace_permissions] + if (ace_action == Allow) and (permission in ace_permissions): + if ace_principal not in denied_here: + allowed_here.add(ace_principal) + if (ace_action == Deny) and (permission in ace_permissions): + denied_here.add(ace_principal) + if ace_principal == Everyone: + # clear the entire allowed set, as we've hit a + # deny of Everyone ala (Deny, Everyone, ALL) + allowed = set() + break + elif ace_principal in allowed: + allowed.remove(ace_principal) + + allowed.update(allowed_here) + + return allowed diff --git a/tests/test_security.py b/tests/test_security.py index fae9db76f..b91aa7682 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -611,3 +611,278 @@ def _makeRequest(): request.registry = Registry() request.context = object() return request + + +class TestACLHelper(unittest.TestCase): + def test_no_acl(self): + from pyramid.security import ACLHelper + + context = DummyContext() + helper = ACLHelper() + result = helper.permits(context, ['foo'], 'permission') + self.assertEqual(result, False) + self.assertEqual(result.ace, '<default deny>') + self.assertEqual( + result.acl, '<No ACL found on any object in resource lineage>' + ) + self.assertEqual(result.permission, 'permission') + self.assertEqual(result.principals, ['foo']) + self.assertEqual(result.context, context) + + def test_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Allow + from pyramid.security import Everyone + from pyramid.security import Authenticated + from pyramid.security import ALL_PERMISSIONS + from pyramid.security import DENY_ALL + + helper = ACLHelper() + root = DummyContext() + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [(Allow, Authenticated, VIEW)] + community.__acl__ = [ + (Allow, 'fred', ALL_PERMISSIONS), + (Allow, 'wilma', VIEW), + DENY_ALL, + ] + blog.__acl__ = [ + (Allow, 'barney', MEMBER_PERMS), + (Allow, 'wilma', VIEW), + ] + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) + self.assertEqual(result.acl, blog.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'wilma'], 'delete' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) + result = helper.permits( + blog, [Everyone, Authenticated, 'barney'], 'administer' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits( + root, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, True) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) + result = helper.permits( + blog, [Everyone, Authenticated, 'someguy'], 'view' + ) + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + self.assertEqual(result.acl, community.__acl__) + + result = helper.permits(root, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, '<default deny>') + self.assertEqual(result.acl, root.__acl__) + + context = DummyContext() + result = helper.permits(context, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.ace, '<default deny>') + self.assertEqual( + result.acl, '<No ACL found on any object in resource lineage>' + ) + + def test_string_permissions_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + root = DummyContext() + root.__acl__ = [(Allow, 'wilma', 'view_stuff')] + + result = helper.permits(root, ['wilma'], 'view') + # would be True if matching against 'view_stuff' instead of against + # ['view_stuff'] + self.assertEqual(result, False) + + def test_callable_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + fn = lambda self: [(Allow, 'bob', 'read')] + context.__acl__ = fn.__get__(context, context.__class__) + result = helper.permits(context, ['bob'], 'read') + self.assertTrue(result) + + def test_principals_allowed_by_permission_direct(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_callable_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import DENY_ALL + + helper = ACLHelper() + context = DummyContext() + acl = lambda: [ + (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read'), + ] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission_string_permission(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + + helper = ACLHelper() + context = DummyContext() + acl = [(Allow, 'chrism', 'read_it')] + context.__acl__ = acl + result = helper.principals_allowed_by_permission(context, 'read') + # would be ['chrism'] if 'read' were compared against 'read_it' instead + # of against ['read_it'] + self.assertEqual(list(result), []) + + def test_principals_allowed_by_permission(self): + from pyramid.security import ACLHelper + from pyramid.security import Allow + from pyramid.security import Deny + from pyramid.security import DENY_ALL + from pyramid.security import ALL_PERMISSIONS + + helper = ACLHelper() + root = DummyContext(__name__='', __parent__=None) + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ + (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)), + (Allow, 'jim', ALL_PERMISSIONS), + ] + community.__acl__ = [ + (Deny, 'flooz', 'read'), + (Allow, 'flooz', 'read'), + (Allow, 'mork', 'read'), + (Deny, 'jim', 'read'), + (Allow, 'someguy', 'manage'), + ] + blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL] + + result = sorted(helper.principals_allowed_by_permission(blog, 'read')) + self.assertEqual(result, ['fred']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + self.assertEqual(result, ['chrism', 'mork', 'other']) + result = sorted( + helper.principals_allowed_by_permission(community, 'read') + ) + result = sorted(helper.principals_allowed_by_permission(root, 'read')) + self.assertEqual(result, ['chrism', 'jim', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + from pyramid.security import ACLHelper + + helper = ACLHelper() + context = DummyContext() + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_not_permission_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'write')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + def test_principals_allowed_by_permission_deny_permission_in_acl(self): + from pyramid.security import ACLHelper + from pyramid.security import Deny + from pyramid.security import Everyone + + helper = ACLHelper() + context = DummyContext() + acl = [(Deny, Everyone, 'read')] + context.__acl__ = acl + result = sorted( + helper.principals_allowed_by_permission(context, 'read') + ) + self.assertEqual(result, []) + + +VIEW = 'view' +EDIT = 'edit' +CREATE = 'create' +DELETE = 'delete' +MODERATE = 'moderate' +ADMINISTER = 'administer' +COMMENT = 'comment' + +GUEST_PERMS = (VIEW, COMMENT) +MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) +MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) +ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) |
