From a1a9fb7128c935848b17c0ce6586991098a17f07 Mon Sep 17 00:00:00 2001 From: Chris McDonough Date: Wed, 27 May 2009 04:52:51 +0000 Subject: Merge authchanges branch to trunk. --- repoze/bfg/security.py | 621 ++++++++++++------------------------------------- 1 file changed, 155 insertions(+), 466 deletions(-) (limited to 'repoze/bfg/security.py') diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py index 90916bac2..5f5252ff3 100644 --- a/repoze/bfg/security.py +++ b/repoze/bfg/security.py @@ -1,10 +1,11 @@ +import warnings + +from zope.component import queryMultiAdapter from zope.component import queryUtility -from zope.deprecation import deprecated from zope.interface import implements -from repoze.bfg.location import lineage - -from repoze.bfg.interfaces import ISecurityPolicy +from repoze.bfg.interfaces import IAuthenticationPolicy +from repoze.bfg.interfaces import IAuthorizationPolicy from repoze.bfg.interfaces import IViewPermission from repoze.bfg.interfaces import IViewPermissionFactory @@ -31,458 +32,155 @@ def has_permission(permission, context, request): ``Allowed`` if the permission is granted in this context to the user implied by the request. Return an instance of ``Denied`` if this permission is not granted in this context to this user. This - delegates to the current security policy. Return True - unconditionally if no security policy has been configured in this - application.""" - policy = queryUtility(ISecurityPolicy) - if policy is None: - return Allowed('No security policy in use.') - return policy.permits(context, request, permission) - -def authenticated_userid(request): + function delegates to the current authentication and authorization + policies. Return ``Allowed`` unconditionally if no authentication + policy has been configured in this application.""" + authn_policy = queryUtility(IAuthenticationPolicy) + if authn_policy is None: + return Allowed('No authentication policy in use.') + + authz_policy = queryUtility(IAuthorizationPolicy) + if authz_policy is None: + raise ValueError('Authentication policy registered without ' + 'authorization policy') # should never happen + principals = authn_policy.effective_principals(context, request) + return authz_policy.permits(context, principals, permission) + +def authenticated_userid(*args): """ Return the userid of the currently authenticated user or - ``None`` if there is no security policy in effect or there is no - currently authenticated user""" - policy = queryUtility(ISecurityPolicy) + ``None`` if there is no authentication policy in effect or there + is no currently authenticated user. """ + + largs = len(args) + if largs > 2: + raise TypeError(args) + if largs == 1: + request = args[0] + context = None + warnings.warn( + 'As of BFG 0.9, the "repoze.bfg.security.authenticated_userid" ' + 'API now takes two arguments: "context" and "request". ' + 'It is being called it with a single argument' + '(assumed to be a request). In a future version, the ' + '"authenticated_userid API will stop accepting calls with a ' + 'single argument; please fix the calling code.', + stacklevel=2) + else: + context, request = args + + policy = queryUtility(IAuthenticationPolicy) if policy is None: return None - return policy.authenticated_userid(request) + return policy.authenticated_userid(context, request) -def effective_principals(request): +def effective_principals(*args): """ Return the list of 'effective' principal identifiers for the request. This will include the userid of the currently authenticated user if a user is currently authenticated. If no - security policy is in effect, this will return an empty sequence.""" - policy = queryUtility(ISecurityPolicy) + authentication policy is in effect, this will return an empty + sequence.""" + + largs = len(args) + if largs > 2: + raise TypeError(args) + if largs == 1: + request = args[0] + context = None + warnings.warn( + 'As of BFG 0.9, the "repoze.bfg.security.effective_principals " ' + 'API now takes two arguments: "context" and "request". ' + 'It is being called it with a single argument' + '(assumed to be a request). In a future version, the ' + '"effective_principals API will stop accepting calls with a ' + 'single argument; please fix the calling code.', + stacklevel=2) + else: + context, request = args + + policy = queryUtility(IAuthenticationPolicy) if policy is None: return [] - return policy.effective_principals(request) + return policy.effective_principals(context, request) def principals_allowed_by_permission(context, permission): """ Provided a context (a model object), and a permission (a string or unicode object), return a sequence of principal ids that - possess the permission in the context. If no security policy is - in effect, this will return a sequence with the single value + possess the permission in the context. If no authorization policy + is in effect, this will return a sequence with the single value representing ``Everyone`` (the special principal identifier - representing all principals). Note that even if a security policy - *is* in effect, some security policies may not implement the - required machinery for this function; those will cause a - ``NotImplementedError`` exception to be raised when this function - is invoked.""" - policy = queryUtility(ISecurityPolicy) + representing all principals). + + .. note:: even if an authorization policy *is* in effect, some + (exotic) authorization policies may not implement the required + machinery for this function; those will cause a + ``NotImplementedError`` exception to be raised when this + function is invoked. + """ + policy = queryUtility(IAuthorizationPolicy) if policy is None: return [Everyone] return policy.principals_allowed_by_permission(context, permission) -class ACLSecurityPolicy(object): - implements(ISecurityPolicy) - - def __init__(self, get_principals): - self.get_principals = get_principals - - def permits(self, context, request, permission): - """ Return ``ACLAllowed`` if the policy permits access, - ``ACLDenied`` if not. """ - principals = set(self.effective_principals(request)) - - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - for ace in acl: - ace_action, ace_principal, ace_permissions = ace - if ace_principal in principals: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - if ace_action == Allow: - return ACLAllowed(ace, acl, permission, - principals, location) - else: - return ACLDenied(ace, acl, permission, - principals, location) - - # default deny if no ACE matches in the ACL found - result = ACLDenied(None, acl, permission, principals, location) - return result - - # default deny if no ACL in lineage at all - return ACLDenied(None, None, permission, principals, context) - - def authenticated_userid(self, request): - principals = self.get_principals(request) - if principals: - return principals[0] - - def effective_principals(self, request): - effective_principals = [Everyone] - principal_ids = self.get_principals(request) - - if principal_ids: - effective_principals.append(Authenticated) - effective_principals.extend(principal_ids) - - return effective_principals - - def principals_allowed_by_permission(self, context, permission): - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - allowed = {} - - for ace_action, ace_principal, ace_permissions in acl: - if ace_action == Allow: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - allowed[ace_principal] = True - return sorted(allowed.keys()) - +def view_execution_permitted(context, request, name=''): + """ If the view specified by ``context`` and ``name`` is protected + by a permission, check the permission associated with the view + using the effective authentication/authorization policies and the + ``request``. Return a boolean result. If no authentication + policy is in effect, or if the view is not protected by a + permission, return True.""" + result = queryMultiAdapter((context, request), IViewPermission, + name=name, default=None) + if result is None: + return Allowed( + 'Allowed: view name %r in context %r (no permission defined)' % + (name, context)) + return result + +def remember(context, request, principal, **kw): + """ Return a sequence of header tuples (e.g. ``[('Set-Cookie', + 'foo=abc')]``) suitable for 'remembering' a set of credentials + implied by the data passed as ``principal`` and ``*kw`` using the + current authentication policy. Common usage might look like so + within the body of a view function (``response`` is assumed to be + an WebOb-style response object computed previously by the view + code):: + + from repoze.bfg.security import forget + headers = remember(context, request, 'chrism', password='123') + response.headerlist.extend(headers) + return response + + If no authentication policy is in use, this function will always + return an empty sequence. If used, the composition and meaning of + ``**kw`` must be agreed upon by the calling code and the effective + authentication policy.""" + policy = queryUtility(IAuthenticationPolicy) + if policy is None: return [] - -class InheritingACLSecurityPolicy(object): - """ A security policy which uses ACLs in the following ways: - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. This - differs from the non-inheriting security policy (the - ``ACLSecurityPolicy``) by virtue of the fact that it does not - stop looking for ACLs in the object lineage after it finds the - first one. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - """ - implements(ISecurityPolicy) - - def __init__(self, get_principals): - self.get_principals = get_principals - - def permits(self, context, request, permission): - """ Return ``ACLAllowed`` if the policy permits access, - ``ACLDenied`` if not. """ - principals = set(self.effective_principals(request)) - - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - for ace in acl: - ace_action, ace_principal, ace_permissions = ace - if ace_principal in principals: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - if ace_action == Allow: - return ACLAllowed(ace, acl, permission, - principals, location) - else: - return ACLDenied(ace, acl, permission, - principals, location) - - # default deny if no ACL in lineage at all - return ACLDenied(None, None, permission, principals, context) - - def authenticated_userid(self, request): - principals = self.get_principals(request) - if principals: - return principals[0] - - def effective_principals(self, request): - effective_principals = [Everyone] - principal_ids = self.get_principals(request) - - if principal_ids: - effective_principals.append(Authenticated) - effective_principals.extend(principal_ids) - - return effective_principals - - def principals_allowed_by_permission(self, context, permission): - allowed = set() - - for location in reversed(list(lineage(context))): - # NB: we're walking *up* the object graph from the root - try: - acl = location.__acl__ - except AttributeError: - continue - - allowed_here = set() - denied_here = set() - - for ace_action, ace_principal, ace_permissions in acl: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if ace_action == Allow and permission in ace_permissions: - if not ace_principal in denied_here: - allowed_here.add(ace_principal) - if ace_action == Deny and permission in ace_permissions: - denied_here.add(ace_principal) - if ace_principal == Everyone: - # clear the entire allowed set, as we've hit a - # deny of Everyone ala (Deny, Everyone, ALL) - allowed = set() - break - elif ace_principal in allowed: - allowed.remove(ace_principal) - - allowed.update(allowed_here) - - return allowed - -def get_remoteuser(request): - user_id = request.environ.get('REMOTE_USER') - if user_id: - return [user_id] - return [] - -def RemoteUserACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the REMOTE_USER variable and - uses any non-false value as a principal id for this request. - - - uses an ACL-based authorization model which attempts to find the - *first* ACL in the context' lineage. It returns ``Allowed`` from - its 'permits' method if the single ACL found grants access to the - current principal. It returns ``Denied`` if permission was not - granted (either explicitly via a deny or implicitly by not finding - a matching ACE action). The *first* ACL found in the context's - lineage is considered canonical; no searching is done for other - security attributes after the first ACL is found in the context' - lineage. Use the 'inheriting' variant of this policy to consider - more than one ACL in the lineage. - - An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, - Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on - model instance objects as their ``__acl__`` attribute will be used - by the security machinery to grant or deny access. - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - - - """ - return ACLSecurityPolicy(get_remoteuser) - -def RemoteUserInheritingACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the REMOTE_USER variable and - uses any non-false value as a principal id for this request. - - - Differs from the non-inheriting security policy variants - (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does - not stop looking for ACLs in the object lineage after it finds - the first one. - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - - - """ - return InheritingACLSecurityPolicy(get_remoteuser) - -def get_who_principals(request): - identity = request.environ.get('repoze.who.identity') - if not identity: + else: + return policy.remember(context, request, principal, **kw) + +def forget(context, request): + """ Return a sequence of header tuples (e.g. ``[('Set-Cookie', + 'foo=abc')]``) suitable for 'forgetting' the set of credentials + possessed by the currently authenticated user. A common usage + might look like so within the body of a view function + (``response`` is assumed to be an WebOb-style response object + computed previously by the view code):: + + from repoze.bfg.security import forget + headers = forget(context, request) + response.headerlist.extend(headers) + return response + + If no authentication policy is in use, this function will always + return an empty sequence.""" + policy = queryUtility(IAuthenticationPolicy) + if policy is None: return [] - principals = [identity['repoze.who.userid']] - principals.extend(identity.get('groups', [])) - return principals - -def WhoACLSecurityPolicy(): - """ - A security policy which: - - - examines the request.environ for the ``repoze.who.identity`` - dictionary. If one is found, the principal ids for the request - are composed of ``repoze.who.identity['repoze.who.userid']`` - plus ``repoze.who.identity.get('groups', [])``. - - - uses an ACL-based authorization model which attempts to find the - *first* ACL in the context' lineage. It returns ``Allowed`` from - its 'permits' method if the single ACL found grants access to the - current principal. It returns ``Denied`` if permission was not - granted (either explicitly via a deny or implicitly by not finding - a matching ACE action). The *first* ACL found in the context's - lineage is considered canonical; no searching is done for other - security attributes after the first ACL is found in the context' - lineage. Use the 'inheriting' variant of this policy to consider - more than one ACL in the lineage. - - An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, - Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on - model instance objects as their ``__acl__`` attribute will be used - by the security machinery to grant or deny access. - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - - """ - return ACLSecurityPolicy(get_who_principals) - -RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy - -deprecated('RepozeWhoIdentityACLSecurityPolicy', - '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy ' - 'should now be imported as ' - 'repoze.bfg.security.WhoACLSecurityPolicy)', - ) - -def WhoInheritingACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the ``repoze.who.identity`` - dictionary. If one is found, the principal ids for the request - are composed of ``repoze.who.identity['repoze.who.userid']`` - plus ``repoze.who.identity.get('groups', [])``. - - - Differs from the non-inheriting security policy variants - (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does - not stop looking for ACLs in the object lineage after it finds - the first one. - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - - """ - return InheritingACLSecurityPolicy(get_who_principals) - + else: + return policy.forget(context, request) + class PermitsResult(int): def __new__(cls, s, *args): inst = int.__new__(cls, cls.boolval) @@ -503,17 +201,18 @@ class PermitsResult(int): self.msg) class Denied(PermitsResult): - """ An instance of ``Denied`` is returned when a security policy - or other ``repoze.bfg`` code denies an action unlrelated to an ACL - check. It evaluates equal to all boolean false types. It has an - attribute named ``msg`` describing the circumstances for the deny.""" + """ An instance of ``Denied`` is returned when a security-related + API or other ``repoze.bfg`` code denies an action unlrelated to an + ACL check. It evaluates equal to all boolean false types. It has + an attribute named ``msg`` describing the circumstances for the + deny.""" boolval = 0 class Allowed(PermitsResult): - """ An instance of ``Allowed`` is returned when a security policy - or other ``repoze.bfg`` code allows an action unrelated to an ACL - check. It evaluates equal to all boolean true types. It has an - attribute named ``msg`` describing the circumstances for the + """ An instance of ``Allowed`` is returned when a security-related + API or other ``repoze.bfg`` code allows an action unrelated to an + ACL check. It evaluates equal to all boolean true types. It has + an attribute named ``msg`` describing the circumstances for the allow.""" boolval = 1 @@ -566,35 +265,25 @@ class ACLAllowed(ACLPermitsResult): as the ``msg`` attribute.""" boolval = 1 -class ViewPermission(object): - implements(IViewPermission) - def __init__(self, context, request, permission_name): - self.context = context - self.request = request - self.permission_name = permission_name - - def __call__(self, security_policy, debug_info=None): - return security_policy.permits(self.context, - self.request, - self.permission_name) - - def __repr__(self): - view_name = getattr(self.request, 'view_name', None) - return '' % (id(self), - self.permission_name, - view_name) - class ViewPermissionFactory(object): implements(IViewPermissionFactory) + def __init__(self, permission_name): self.permission_name = permission_name def __call__(self, context, request): - return ViewPermission(context, request, self.permission_name) - + return has_permission(self.permission_name, context, request) + class Unauthorized(Exception): pass +# BBB imports: these must come at the end of the file, as there's a +# circular dependency between secpols and security +from repoze.bfg.secpols import ACLSecurityPolicy +from repoze.bfg.secpols import InheritingACLSecurityPolicy +from repoze.bfg.secpols import RemoteUserACLSecurityPolicy +from repoze.bfg.secpols import RemoteUserInheritingACLSecurityPolicy +from repoze.bfg.secpols import WhoACLSecurityPolicy +from repoze.bfg.secpols import WhoInheritingACLSecurityPolicy +# /BBB imports - - -- cgit v1.2.3