summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTheron Luhn <theron@luhn.com>2019-03-30 10:08:31 -0700
committerTheron Luhn <theron@luhn.com>2019-03-30 10:08:31 -0700
commit31998bcdd0396316c1a0fdeb50bee59e4b9e14ed (patch)
tree8b3dde7bc075519f823ce2d29cd57b7ebfc763ea
parent282a59b05704c10307a5def54fe3531cfd49dcb7 (diff)
downloadpyramid-31998bcdd0396316c1a0fdeb50bee59e4b9e14ed.tar.gz
pyramid-31998bcdd0396316c1a0fdeb50bee59e4b9e14ed.tar.bz2
pyramid-31998bcdd0396316c1a0fdeb50bee59e4b9e14ed.zip
Implement pyramid.security.ACLHelper
Mostly a lift-and-shift of the code in ACLAuthorizationPolicy.
-rw-r--r--src/pyramid/authorization.py79
-rw-r--r--src/pyramid/security.py145
-rw-r--r--tests/test_security.py275
3 files changed, 415 insertions, 84 deletions
diff --git a/src/pyramid/authorization.py b/src/pyramid/authorization.py
index 6056a8d25..19b96e3d1 100644
--- a/src/pyramid/authorization.py
+++ b/src/pyramid/authorization.py
@@ -2,11 +2,7 @@ from zope.interface import implementer
from pyramid.interfaces import IAuthorizationPolicy
-from pyramid.location import lineage
-
-from pyramid.security import ACLAllowed, ACLDenied, Allow, Deny, Everyone
-
-from pyramid.util import is_nonstr_iter
+from pyramid.security import ACLHelper
@implementer(IAuthorizationPolicy)
@@ -61,80 +57,21 @@ class ACLAuthorizationPolicy(object):
:class:`pyramid.interfaces.IAuthorizationPolicy` interface.
"""
+ def __init__(self):
+ self.helper = ACLHelper()
+
def permits(self, context, principals, permission):
""" Return an instance of
:class:`pyramid.security.ACLAllowed` instance if the policy
permits access, return an instance of
:class:`pyramid.security.ACLDenied` if not."""
-
- acl = '<No ACL found on any object in resource lineage>'
-
- for location in lineage(context):
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- if acl and callable(acl):
- acl = acl()
-
- for ace in acl:
- ace_action, ace_principal, ace_permissions = ace
- if ace_principal in principals:
- if not is_nonstr_iter(ace_permissions):
- ace_permissions = [ace_permissions]
- if permission in ace_permissions:
- if ace_action == Allow:
- return ACLAllowed(
- ace, acl, permission, principals, location
- )
- else:
- return ACLDenied(
- ace, acl, permission, principals, location
- )
-
- # default deny (if no ACL in lineage at all, or if none of the
- # principals were mentioned in any ACE we found)
- return ACLDenied(
- '<default deny>', acl, permission, principals, context
- )
+ return self.helper.permits(context, principals, permission)
def principals_allowed_by_permission(self, context, permission):
""" Return the set of principals explicitly granted the
permission named ``permission`` according to the ACL directly
attached to the ``context`` as well as inherited ACLs based on
the :term:`lineage`."""
- allowed = set()
-
- for location in reversed(list(lineage(context))):
- # NB: we're walking *up* the object graph from the root
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- allowed_here = set()
- denied_here = set()
-
- if acl and callable(acl):
- acl = acl()
-
- for ace_action, ace_principal, ace_permissions in acl:
- if not is_nonstr_iter(ace_permissions):
- ace_permissions = [ace_permissions]
- if (ace_action == Allow) and (permission in ace_permissions):
- if ace_principal not in denied_here:
- allowed_here.add(ace_principal)
- if (ace_action == Deny) and (permission in ace_permissions):
- denied_here.add(ace_principal)
- if ace_principal == Everyone:
- # clear the entire allowed set, as we've hit a
- # deny of Everyone ala (Deny, Everyone, ALL)
- allowed = set()
- break
- elif ace_principal in allowed:
- allowed.remove(ace_principal)
-
- allowed.update(allowed_here)
-
- return allowed
+ return self.helper.principals_allowed_by_permission(
+ context, permission
+ )
diff --git a/src/pyramid/security.py b/src/pyramid/security.py
index 9088a9746..bfd505a98 100644
--- a/src/pyramid/security.py
+++ b/src/pyramid/security.py
@@ -9,6 +9,10 @@ from pyramid.interfaces import (
IViewClassifier,
)
+from pyramid.location import lineage
+
+from pyramid.util import is_nonstr_iter
+
from pyramid.threadlocal import get_current_registry
Everyone = 'system.Everyone'
@@ -36,22 +40,12 @@ DENY_ALL = (Deny, Everyone, ALL_PERMISSIONS)
NO_PERMISSION_REQUIRED = '__no_permission_required__'
-def _get_registry(request):
- try:
- reg = request.registry
- except AttributeError:
- reg = get_current_registry() # b/c
- return reg
-
-
def _get_security_policy(request):
- registry = _get_registry(request)
- return registry.queryUtility(ISecurityPolicy)
+ return request.registry.queryUtility(ISecurityPolicy)
def _get_authentication_policy(request):
- registry = _get_registry(request)
- return registry.queryUtility(IAuthenticationPolicy)
+ return request.registry.queryUtility(IAuthenticationPolicy)
def remember(request, userid, **kw):
@@ -154,7 +148,7 @@ def view_execution_permitted(context, request, name=''):
An exception is raised if no view is found.
"""
- reg = _get_registry(request)
+ reg = request.registry
provides = [IViewClassifier] + [providedBy(x) for x in (request, context)]
# XXX not sure what to do here about using _find_views or analogue;
# for now let's just keep it as-is
@@ -421,3 +415,128 @@ class LegacySecurityPolicy:
authz = self._get_authz_policy(request)
principals = authn.effective_principals(request)
return authz.permits(context, principals, permission)
+
+
+class ACLHelper:
+ """ A helper for use with constructing a :term:`security policy` which
+ consults an :term:`ACL` object attached to a :term:`context` to determine
+ authorization information about a :term:`principal` or multiple principals.
+ If the context is part of a :term:`lineage`, the context's parents are
+ consulted for ACL information too.
+
+ """
+
+ def permits(self, context, principals, permission):
+ """ Return an instance of :class:`pyramid.security.ACLAllowed` if the
+ ACL allows access a user with the given principals, return an instance
+ of :class:`pyramid.security.ACLDenied` if not.
+
+ When checking if principals are allowed, the security policy consults
+ the ``context`` for an ACL first. If no ACL exists on the context, or
+ one does exist but the ACL does not explicitly allow or deny access for
+ any of the effective principals, consult the context's parent ACL, and
+ so on, until the lineage is exhausted or we determine that the policy
+ permits or denies.
+
+ During this processing, if any :data:`pyramid.security.Deny`
+ ACE is found matching any principal in ``principals``, stop
+ processing by returning an
+ :class:`pyramid.security.ACLDenied` instance (equals
+ ``False``) immediately. If any
+ :data:`pyramid.security.Allow` ACE is found matching any
+ principal, stop processing by returning an
+ :class:`pyramid.security.ACLAllowed` instance (equals
+ ``True``) immediately. If we exhaust the context's
+ :term:`lineage`, and no ACE has explicitly permitted or denied
+ access, return an instance of
+ :class:`pyramid.security.ACLDenied` (equals ``False``).
+
+ """
+ acl = '<No ACL found on any object in resource lineage>'
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ if acl and callable(acl):
+ acl = acl()
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not is_nonstr_iter(ace_permissions):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(
+ ace, acl, permission, principals, location
+ )
+ else:
+ return ACLDenied(
+ ace, acl, permission, principals, location
+ )
+
+ # default deny (if no ACL in lineage at all, or if none of the
+ # principals were mentioned in any ACE we found)
+ return ACLDenied(
+ '<default deny>', acl, permission, principals, context
+ )
+
+ def principals_allowed_by_permission(self, context, permission):
+ """ Return the set of principals explicitly granted the permission
+ named ``permission`` according to the ACL directly attached to the
+ ``context`` as well as inherited ACLs based on the :term:`lineage`.
+
+ When computing principals allowed by a permission, we compute the set
+ of principals that are explicitly granted the ``permission`` in the
+ provided ``context``. We do this by walking 'up' the object graph
+ *from the root* to the context. During this walking process, if we
+ find an explicit :data:`pyramid.security.Allow` ACE for a principal
+ that matches the ``permission``, the principal is included in the allow
+ list. However, if later in the walking process that principal is
+ mentioned in any :data:`pyramid.security.Deny` ACE for the permission,
+ the principal is removed from the allow list. If a
+ :data:`pyramid.security.Deny` to the principal
+ :data:`pyramid.security.Everyone` is encountered during the walking
+ process that matches the ``permission``, the allow list is cleared for
+ all principals encountered in previous ACLs. The walking process ends
+ after we've processed the any ACL directly attached to ``context``; a
+ set of principals is returned.
+
+ """
+ allowed = set()
+
+ for location in reversed(list(lineage(context))):
+ # NB: we're walking *up* the object graph from the root
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed_here = set()
+ denied_here = set()
+
+ if acl and callable(acl):
+ acl = acl()
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if not is_nonstr_iter(ace_permissions):
+ ace_permissions = [ace_permissions]
+ if (ace_action == Allow) and (permission in ace_permissions):
+ if ace_principal not in denied_here:
+ allowed_here.add(ace_principal)
+ if (ace_action == Deny) and (permission in ace_permissions):
+ denied_here.add(ace_principal)
+ if ace_principal == Everyone:
+ # clear the entire allowed set, as we've hit a
+ # deny of Everyone ala (Deny, Everyone, ALL)
+ allowed = set()
+ break
+ elif ace_principal in allowed:
+ allowed.remove(ace_principal)
+
+ allowed.update(allowed_here)
+
+ return allowed
diff --git a/tests/test_security.py b/tests/test_security.py
index fae9db76f..b91aa7682 100644
--- a/tests/test_security.py
+++ b/tests/test_security.py
@@ -611,3 +611,278 @@ def _makeRequest():
request.registry = Registry()
request.context = object()
return request
+
+
+class TestACLHelper(unittest.TestCase):
+ def test_no_acl(self):
+ from pyramid.security import ACLHelper
+
+ context = DummyContext()
+ helper = ACLHelper()
+ result = helper.permits(context, ['foo'], 'permission')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(
+ result.acl, '<No ACL found on any object in resource lineage>'
+ )
+ self.assertEqual(result.permission, 'permission')
+ self.assertEqual(result.principals, ['foo'])
+ self.assertEqual(result.context, context)
+
+ def test_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Allow
+ from pyramid.security import Everyone
+ from pyramid.security import Authenticated
+ from pyramid.security import ALL_PERMISSIONS
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ root = DummyContext()
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [(Allow, Authenticated, VIEW)]
+ community.__acl__ = [
+ (Allow, 'fred', ALL_PERMISSIONS),
+ (Allow, 'wilma', VIEW),
+ DENY_ALL,
+ ]
+ blog.__acl__ = [
+ (Allow, 'barney', MEMBER_PERMS),
+ (Allow, 'wilma', VIEW),
+ ]
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'wilma'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'wilma', VIEW))
+ self.assertEqual(result.acl, blog.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'wilma'], 'delete'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'fred'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'fred'], 'doesntevenexistyet'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'barney'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'barney'], 'administer'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(
+ root, [Everyone, Authenticated, 'someguy'], 'view'
+ )
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, (Allow, Authenticated, VIEW))
+ result = helper.permits(
+ blog, [Everyone, Authenticated, 'someguy'], 'view'
+ )
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+ self.assertEqual(result.acl, community.__acl__)
+
+ result = helper.permits(root, [Everyone], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(result.acl, root.__acl__)
+
+ context = DummyContext()
+ result = helper.permits(context, [Everyone], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.ace, '<default deny>')
+ self.assertEqual(
+ result.acl, '<No ACL found on any object in resource lineage>'
+ )
+
+ def test_string_permissions_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ root = DummyContext()
+ root.__acl__ = [(Allow, 'wilma', 'view_stuff')]
+
+ result = helper.permits(root, ['wilma'], 'view')
+ # would be True if matching against 'view_stuff' instead of against
+ # ['view_stuff']
+ self.assertEqual(result, False)
+
+ def test_callable_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ context = DummyContext()
+ fn = lambda self: [(Allow, 'bob', 'read')]
+ context.__acl__ = fn.__get__(context, context.__class__)
+ result = helper.permits(context, ['bob'], 'read')
+ self.assertTrue(result)
+
+ def test_principals_allowed_by_permission_direct(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [
+ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read'),
+ ]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission_callable_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import DENY_ALL
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = lambda: [
+ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read'),
+ ]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission_string_permission(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Allow, 'chrism', 'read_it')]
+ context.__acl__ = acl
+ result = helper.principals_allowed_by_permission(context, 'read')
+ # would be ['chrism'] if 'read' were compared against 'read_it' instead
+ # of against ['read_it']
+ self.assertEqual(list(result), [])
+
+ def test_principals_allowed_by_permission(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Allow
+ from pyramid.security import Deny
+ from pyramid.security import DENY_ALL
+ from pyramid.security import ALL_PERMISSIONS
+
+ helper = ACLHelper()
+ root = DummyContext(__name__='', __parent__=None)
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [
+ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', ('read',)),
+ (Allow, 'jim', ALL_PERMISSIONS),
+ ]
+ community.__acl__ = [
+ (Deny, 'flooz', 'read'),
+ (Allow, 'flooz', 'read'),
+ (Allow, 'mork', 'read'),
+ (Deny, 'jim', 'read'),
+ (Allow, 'someguy', 'manage'),
+ ]
+ blog.__acl__ = [(Allow, 'fred', 'read'), DENY_ALL]
+
+ result = sorted(helper.principals_allowed_by_permission(blog, 'read'))
+ self.assertEqual(result, ['fred'])
+ result = sorted(
+ helper.principals_allowed_by_permission(community, 'read')
+ )
+ self.assertEqual(result, ['chrism', 'mork', 'other'])
+ result = sorted(
+ helper.principals_allowed_by_permission(community, 'read')
+ )
+ result = sorted(helper.principals_allowed_by_permission(root, 'read'))
+ self.assertEqual(result, ['chrism', 'jim', 'other'])
+
+ def test_principals_allowed_by_permission_no_acls(self):
+ from pyramid.security import ACLHelper
+
+ helper = ACLHelper()
+ context = DummyContext()
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+ def test_principals_allowed_by_permission_deny_not_permission_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Everyone
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Deny, Everyone, 'write')]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+ def test_principals_allowed_by_permission_deny_permission_in_acl(self):
+ from pyramid.security import ACLHelper
+ from pyramid.security import Deny
+ from pyramid.security import Everyone
+
+ helper = ACLHelper()
+ context = DummyContext()
+ acl = [(Deny, Everyone, 'read')]
+ context.__acl__ = acl
+ result = sorted(
+ helper.principals_allowed_by_permission(context, 'read')
+ )
+ self.assertEqual(result, [])
+
+
+VIEW = 'view'
+EDIT = 'edit'
+CREATE = 'create'
+DELETE = 'delete'
+MODERATE = 'moderate'
+ADMINISTER = 'administer'
+COMMENT = 'comment'
+
+GUEST_PERMS = (VIEW, COMMENT)
+MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE)
+MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,)
+ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,)