summaryrefslogtreecommitdiff
path: root/repoze/bfg/secpols.py
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2009-05-27 04:52:51 +0000
committerChris McDonough <chrism@agendaless.com>2009-05-27 04:52:51 +0000
commita1a9fb7128c935848b17c0ce6586991098a17f07 (patch)
tree5160f28be92202033c693caa335f8b9cda3c6379 /repoze/bfg/secpols.py
parent08ead74d05e25f58c83712f6f8651484ddc983d0 (diff)
downloadpyramid-a1a9fb7128c935848b17c0ce6586991098a17f07.tar.gz
pyramid-a1a9fb7128c935848b17c0ce6586991098a17f07.tar.bz2
pyramid-a1a9fb7128c935848b17c0ce6586991098a17f07.zip
Merge authchanges branch to trunk.
Diffstat (limited to 'repoze/bfg/secpols.py')
-rw-r--r--repoze/bfg/secpols.py469
1 files changed, 469 insertions, 0 deletions
diff --git a/repoze/bfg/secpols.py b/repoze/bfg/secpols.py
new file mode 100644
index 000000000..0f0fc7e66
--- /dev/null
+++ b/repoze/bfg/secpols.py
@@ -0,0 +1,469 @@
+from zope.deprecation import deprecated
+from zope.interface import implements
+
+from repoze.bfg.interfaces import ISecurityPolicy
+from repoze.bfg.interfaces import IAuthorizationPolicy
+from repoze.bfg.interfaces import IAuthenticationPolicy
+
+from repoze.bfg.location import lineage
+
+from repoze.bfg.threadlocal import manager
+
+from repoze.bfg.security import Allow
+from repoze.bfg.security import Deny
+from repoze.bfg.security import ACLAllowed
+from repoze.bfg.security import ACLDenied
+from repoze.bfg.security import Everyone
+from repoze.bfg.security import Authenticated
+
+class ACLSecurityPolicy(object):
+ implements(ISecurityPolicy)
+
+ def __init__(self, get_principals):
+ self.get_principals = get_principals
+
+ def permits(self, context, request, permission):
+ """ Return ``ACLAllowed`` if the policy permits access,
+ ``ACLDenied`` if not. """
+ principals = set(self.effective_principals(request))
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(ace, acl, permission,
+ principals, location)
+ else:
+ return ACLDenied(ace, acl, permission,
+ principals, location)
+
+ # default deny if no ACE matches in the ACL found
+ result = ACLDenied(None, acl, permission, principals, location)
+ return result
+
+ # default deny if no ACL in lineage at all
+ return ACLDenied(None, None, permission, principals, context)
+
+ def authenticated_userid(self, request):
+ principals = self.get_principals(request)
+ if principals:
+ return principals[0]
+
+ def effective_principals(self, request):
+ effective_principals = [Everyone]
+ principal_ids = self.get_principals(request)
+
+ if principal_ids:
+ effective_principals.append(Authenticated)
+ effective_principals.extend(principal_ids)
+
+ return effective_principals
+
+ def principals_allowed_by_permission(self, context, permission):
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed = {}
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if ace_action == Allow:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ allowed[ace_principal] = True
+ return sorted(allowed.keys())
+
+ return []
+
+class InheritingACLSecurityPolicy(object):
+ """ A security policy which uses ACLs in the following ways:
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``. This
+ differs from the non-inheriting security policy (the
+ ``ACLSecurityPolicy``) by virtue of the fact that it does not
+ stop looking for ACLs in the object lineage after it finds the
+ first one.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+ """
+ implements(ISecurityPolicy)
+
+ def __init__(self, get_principals):
+ self.get_principals = get_principals
+
+ def permits(self, context, request, permission):
+ """ Return ``ACLAllowed`` if the policy permits access,
+ ``ACLDenied`` if not. """
+ principals = set(self.effective_principals(request))
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(ace, acl, permission,
+ principals, location)
+ else:
+ return ACLDenied(ace, acl, permission,
+ principals, location)
+
+ # default deny if no ACL in lineage at all
+ return ACLDenied(None, None, permission, principals, context)
+
+ def authenticated_userid(self, request):
+ principals = self.get_principals(request)
+ if principals:
+ return principals[0]
+
+ def effective_principals(self, request):
+ effective_principals = [Everyone]
+ principal_ids = self.get_principals(request)
+
+ if principal_ids:
+ effective_principals.append(Authenticated)
+ effective_principals.extend(principal_ids)
+
+ return effective_principals
+
+ def principals_allowed_by_permission(self, context, permission):
+ allowed = set()
+
+ for location in reversed(list(lineage(context))):
+ # NB: we're walking *up* the object graph from the root
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed_here = set()
+ denied_here = set()
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if ace_action == Allow and permission in ace_permissions:
+ if not ace_principal in denied_here:
+ allowed_here.add(ace_principal)
+ if ace_action == Deny and permission in ace_permissions:
+ denied_here.add(ace_principal)
+ if ace_principal == Everyone:
+ # clear the entire allowed set, as we've hit a
+ # deny of Everyone ala (Deny, Everyone, ALL)
+ allowed = set()
+ break
+ elif ace_principal in allowed:
+ allowed.remove(ace_principal)
+
+ allowed.update(allowed_here)
+
+ return allowed
+
+def get_remoteuser(request):
+ user_id = request.environ.get('REMOTE_USER')
+ if user_id:
+ return [user_id]
+ return []
+
+def RemoteUserACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the REMOTE_USER variable and
+ uses any non-false value as a principal id for this request.
+
+ - uses an ACL-based authorization model which attempts to find the
+ *first* ACL in the context' lineage. It returns ``Allowed`` from
+ its 'permits' method if the single ACL found grants access to the
+ current principal. It returns ``Denied`` if permission was not
+ granted (either explicitly via a deny or implicitly by not finding
+ a matching ACE action). The *first* ACL found in the context's
+ lineage is considered canonical; no searching is done for other
+ security attributes after the first ACL is found in the context'
+ lineage. Use the 'inheriting' variant of this policy to consider
+ more than one ACL in the lineage.
+
+ An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
+ Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
+ model instance objects as their ``__acl__`` attribute will be used
+ by the security machinery to grant or deny access.
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.RemoteUserACLSecurityPolicy"
+ />
+
+ """
+ return ACLSecurityPolicy(get_remoteuser)
+
+def RemoteUserInheritingACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the REMOTE_USER variable and
+ uses any non-false value as a principal id for this request.
+
+ - Differs from the non-inheriting security policy variants
+ (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
+ not stop looking for ACLs in the object lineage after it finds
+ the first one.
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy"
+ />
+
+ """
+ return InheritingACLSecurityPolicy(get_remoteuser)
+
+def get_who_principals(request):
+ identity = request.environ.get('repoze.who.identity')
+ if not identity:
+ return []
+ principals = [identity['repoze.who.userid']]
+ principals.extend(identity.get('groups', []))
+ return principals
+
+def WhoACLSecurityPolicy():
+ """
+ A security policy which:
+
+ - examines the request.environ for the ``repoze.who.identity``
+ dictionary. If one is found, the principal ids for the request
+ are composed of ``repoze.who.identity['repoze.who.userid']``
+ plus ``repoze.who.identity.get('groups', [])``.
+
+ - uses an ACL-based authorization model which attempts to find the
+ *first* ACL in the context' lineage. It returns ``Allowed`` from
+ its 'permits' method if the single ACL found grants access to the
+ current principal. It returns ``Denied`` if permission was not
+ granted (either explicitly via a deny or implicitly by not finding
+ a matching ACE action). The *first* ACL found in the context's
+ lineage is considered canonical; no searching is done for other
+ security attributes after the first ACL is found in the context'
+ lineage. Use the 'inheriting' variant of this policy to consider
+ more than one ACL in the lineage.
+
+ An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
+ Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
+ model instance objects as their ``__acl__`` attribute will be used
+ by the security machinery to grant or deny access.
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.WhoACLSecurityPolicy"
+ />
+ """
+ return ACLSecurityPolicy(get_who_principals)
+
+RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy
+
+deprecated('RepozeWhoIdentityACLSecurityPolicy',
+ '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy '
+ 'should now be imported as '
+ 'repoze.bfg.security.WhoACLSecurityPolicy)',
+ )
+
+def WhoInheritingACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the ``repoze.who.identity``
+ dictionary. If one is found, the principal ids for the request
+ are composed of ``repoze.who.identity['repoze.who.userid']``
+ plus ``repoze.who.identity.get('groups', [])``.
+
+ - Differs from the non-inheriting security policy variants
+ (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
+ not stop looking for ACLs in the object lineage after it finds
+ the first one.
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.WhoInheritingACLSecurityPolicy"
+ />
+ """
+ return InheritingACLSecurityPolicy(get_who_principals)
+
+
+class SecurityPolicyToAuthorizationPolicyAdapter(object):
+ """ An adapter registered when an old-style ISecurityPolicy
+ utility is configured in ZCML instead of an IAuthorizationPolicy
+ utility """
+ implements(IAuthorizationPolicy)
+ def __init__(self, secpol):
+ self.secpol = secpol
+
+ def permits(self, context, principals, permission):
+ request = manager.get()['request']
+ return self.secpol.permits(context, request, permission)
+
+ def principals_allowed_by_permission(self, context, permission):
+ return self.secpol.principals_allowed_by_permission(context, permission)
+
+class SecurityPolicyToAuthenticationPolicyAdapter(object):
+ implements(IAuthenticationPolicy)
+ def __init__(self, secpol):
+ self.secpol = secpol
+
+ def authenticated_userid(self, context, request):
+ return self.secpol.authenticated_userid(request)
+
+ def effective_principals(self, context, request):
+ return self.secpol.effective_principals(request)
+
+ def remember(self, context, request, principal, **kw):
+ return []
+
+ def forget(self, context, request):
+ return []
+
+def registerBBBAuthn(secpol, registry):
+ # Used when an explicit authentication policy is not defined, and
+ # an an old-style ISecurityPolicy is registered (via ZCML), turn
+ # it into separate authorization and authentication utilities
+ # using adapters
+ authn = SecurityPolicyToAuthenticationPolicyAdapter(secpol)
+ authz = SecurityPolicyToAuthorizationPolicyAdapter(secpol)
+ registry.registerUtility(authn, IAuthenticationPolicy)
+ registry.registerUtility(authz, IAuthorizationPolicy)